Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4709,6 +4709,7 @@ paths:
type: array
state:
enum:
- staged
- ready
- pending
- confirmed
Expand Down Expand Up @@ -4926,6 +4927,7 @@ paths:
type: array
state:
enum:
- staged
- ready
- pending
- confirmed
Expand Down Expand Up @@ -5307,6 +5309,7 @@ paths:
type: array
state:
enum:
- staged
- ready
- pending
- confirmed
Expand Down
2 changes: 1 addition & 1 deletion internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Manager interface {
GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error)

// Bound token callbacks
TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error
TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error

Start() error
WaitStop()
Expand Down
54 changes: 26 additions & 28 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,11 @@ import (

"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)

// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package
func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) {
op.Input = fftypes.JSONObject{
"id": transfer.LocalID.String(),
}
}

func (am *assetManager) GetTokenTransfers(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) {
return am.database.GetTokenTransfers(ctx, am.scopeNS(ns, filter))
}
Expand Down Expand Up @@ -180,41 +174,38 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, connector, poolN
}

func (s *transferSender) resolveAndSend(ctx context.Context, method sendMethod) (err error) {
var messageSender sysmessaging.MessageSender
if !s.resolved {
if messageSender, err = s.resolve(ctx); err != nil {
if err = s.resolve(ctx); err != nil {
return err
}
s.resolved = true
}

if messageSender != nil {
if method == methodSendAndWait {
if err = s.sendInternal(ctx, method); err != nil {
return err
}
return messageSender.SendAndWait(ctx)
}

if err := messageSender.Send(ctx); err != nil {
return err
}
if method == methodSendAndWait && s.transfer.Message != nil {
// Begin waiting for the message, and trigger the transfer.
// A successful transfer will trigger the message via the event handler, so we can wait for it all to complete.
_, err := s.mgr.syncasync.WaitForMessage(ctx, s.namespace, s.transfer.Message.Header.ID, func(ctx context.Context) error {
return s.sendInternal(ctx, methodSendAndWait)
})
return err
}

return s.sendInternal(ctx, method)
}

func (s *transferSender) resolve(ctx context.Context) (sender sysmessaging.MessageSender, err error) {
func (s *transferSender) resolve(ctx context.Context) error {
// Resolve the attached message
if s.transfer.Message != nil {
if sender, err = s.buildTransferMessage(ctx, s.namespace, s.transfer.Message); err != nil {
return nil, err
sender, err := s.buildTransferMessage(ctx, s.namespace, s.transfer.Message)
if err != nil {
return err
}
if err = sender.Prepare(ctx); err != nil {
return nil, err
return err
}
s.transfer.MessageHash = s.transfer.Message.Hash
}
return sender, nil
return nil
}

func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) error {
Expand Down Expand Up @@ -257,7 +248,7 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er
"",
fftypes.OpTypeTokenTransfer,
fftypes.OpStatusPending)
addTokenTransferInputs(op, &s.transfer.TokenTransfer)
txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer)

err = s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) {
pool, err := s.mgr.GetTokenPool(ctx, s.namespace, s.connector, s.poolName)
Expand All @@ -267,8 +258,15 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er
s.transfer.PoolProtocolID = pool.ProtocolID

err = s.mgr.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */)
if err == nil {
err = s.mgr.database.UpsertOperation(ctx, op, false)
if err != nil {
return err
}
if err = s.mgr.database.UpsertOperation(ctx, op, false); err != nil {
return err
}
if s.transfer.Message != nil {
s.transfer.Message.State = fftypes.MessageStateStaged
err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, false, false)
}
return err
})
Expand Down
107 changes: 14 additions & 93 deletions internal/assets/token_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,9 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) {
mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mms.On("Send", context.Background()).Return(nil)
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), false, false).Return(nil)

_, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false)
assert.NoError(t, err)
Expand All @@ -499,41 +501,6 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) {
mms.AssertExpectations(t)
}

func TestTransferTokensWithBroadcastMessageFail(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

transfer := &fftypes.TokenTransferInput{
TokenTransfer: fftypes.TokenTransfer{
From: "A",
To: "B",
},
Message: &fftypes.MessageInOut{
InlineData: fftypes.InlineData{
{
Value: []byte("test data"),
},
},
},
}
transfer.Amount.Int().SetInt64(5)

mim := am.identity.(*identitymanagermocks.Manager)
mbm := am.broadcast.(*broadcastmocks.Manager)
mms := &sysmessagingmocks.MessageSender{}
mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mms.On("Send", context.Background()).Return(fmt.Errorf("pop"))

_, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false)
assert.EqualError(t, err, "pop")

mbm.AssertExpectations(t)
mim.AssertExpectations(t)
mms.AssertExpectations(t)
}

func TestTransferTokensWithBroadcastPrepareFail(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
Expand Down Expand Up @@ -608,7 +575,9 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) {
mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil)
mpm.On("NewMessage", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mms.On("Send", context.Background()).Return(nil)
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), false, false).Return(nil)

_, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -751,73 +720,25 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) {
}), false).Return(nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mms.On("SendAndWait", context.Background()).Return(nil)
msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything).
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), false, false).Return(nil)
msa.On("WaitForMessage", context.Background(), "ns1", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
send := args[3].(syncasync.RequestSender)
send(context.Background())
}).
Return(&transfer.TokenTransfer, nil)

_, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true)
assert.NoError(t, err)
assert.Equal(t, *hash, *transfer.MessageHash)

mbm.AssertExpectations(t)
mim.AssertExpectations(t)
mdi.AssertExpectations(t)
mti.AssertExpectations(t)
mms.AssertExpectations(t)
msa.AssertExpectations(t)
}

func TestTransferTokensWithBroadcastConfirmTransferFail(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

hash := fftypes.NewRandB32()
transfer := &fftypes.TokenTransferInput{
TokenTransfer: fftypes.TokenTransfer{
From: "A",
To: "B",
},
Message: &fftypes.MessageInOut{
Message: fftypes.Message{
Hash: hash,
},
InlineData: fftypes.InlineData{
{
Value: []byte("test data"),
},
},
},
}
transfer.Amount.Int().SetInt64(5)

mdi := am.database.(*databasemocks.Plugin)
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mim := am.identity.(*identitymanagermocks.Manager)
mbm := am.broadcast.(*broadcastmocks.Manager)
mms := &sysmessagingmocks.MessageSender{}
msa := am.syncasync.(*syncasyncmocks.Bridge)
mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil)
mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil)
mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil)
mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool {
return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer
}), false).Return(nil)
mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
Return(&fftypes.Message{}, nil)
msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
send := args[3].(syncasync.RequestSender)
send(context.Background())
}).
Return(nil, fmt.Errorf("pop"))
Return(&transfer.TokenTransfer, nil)

_, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true)
assert.EqualError(t, err, "pop")
assert.NoError(t, err)
assert.Equal(t, *hash, *transfer.MessageHash)

mbm.AssertExpectations(t)
mim.AssertExpectations(t)
Expand Down
10 changes: 8 additions & 2 deletions internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"encoding/json"
"strconv"

"github.com/hyperledger/firefly/internal/broadcast"
"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/events/eifactory"
"github.com/hyperledger/firefly/internal/events/system"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/identity"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/privatemessaging"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/internal/syshandlers"
"github.com/hyperledger/firefly/internal/sysmessaging"
Expand Down Expand Up @@ -80,15 +82,17 @@ type eventManager struct {
retry retry.Retry
txhelper txcommon.Helper
aggregator *aggregator
broadcast broadcast.Manager
messaging privatemessaging.Manager
newEventNotifier *eventNotifier
newPinNotifier *eventNotifier
opCorrelationRetries int
defaultTransport string
internalEvents *system.Events
}

func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager) (EventManager, error) {
if pi == nil || di == nil || im == nil || dm == nil {
func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager) (EventManager, error) {
if pi == nil || di == nil || im == nil || dm == nil || bm == nil || pm == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
newPinNotifier := newEventNotifier(ctx, "pins")
Expand All @@ -100,6 +104,8 @@ func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.P
identity: im,
syshandlers: sh,
data: dm,
broadcast: bm,
messaging: pm,
retry: retry.Retry{
InitialDelay: config.GetDuration(config.EventAggregatorRetryInitDelay),
MaximumDelay: config.GetDuration(config.EventAggregatorRetryMaxDelay),
Expand Down
12 changes: 9 additions & 3 deletions internal/events/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/events/system"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/eventsmocks"
"github.com/hyperledger/firefly/mocks/identitymanagermocks"
"github.com/hyperledger/firefly/mocks/privatemessagingmocks"
"github.com/hyperledger/firefly/mocks/publicstoragemocks"
"github.com/hyperledger/firefly/mocks/syshandlersmocks"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand All @@ -43,8 +45,10 @@ func newTestEventManager(t *testing.T) (*eventManager, func()) {
met := &eventsmocks.Plugin{}
mdm := &datamocks.Manager{}
msh := &syshandlersmocks.SystemHandlers{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
met.On("Name").Return("ut").Maybe()
emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm)
emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm, mbm, mpm)
em := emi.(*eventManager)
rag := mdi.On("RunAsGroup", em.ctx, mock.Anything).Maybe()
rag.RunFn = func(a mock.Arguments) {
Expand Down Expand Up @@ -74,7 +78,7 @@ func TestStartStop(t *testing.T) {
}

func TestStartStopBadDependencies(t *testing.T) {
_, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil)
_, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil, nil, nil)
assert.Regexp(t, "FF10128", err)

}
Expand All @@ -87,7 +91,9 @@ func TestStartStopBadTransports(t *testing.T) {
mpi := &publicstoragemocks.Plugin{}
mdm := &datamocks.Manager{}
msh := &syshandlersmocks.SystemHandlers{}
_, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm)
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
_, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm, mbm, mpm)
assert.Regexp(t, "FF10172", err)

}
Expand Down
8 changes: 8 additions & 0 deletions internal/events/operation_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,13 @@ func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftyp
if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil {
return err
}

// Special handling for OpTypeTokenTransfer, which writes an event when it fails
if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mental note (not suggesting any change): the next time we find this pattern, we might want to do something a little generic around failure event mapping, rather than building a bunch of switches down here.

event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID)
if err := em.database.InsertEvent(em.ctx, event); err != nil {
return err
}
}
return nil
}
Loading