Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type EventManager interface {
WaitStop()

// Bound blockchain callbacks
OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState blockchain.TransactionStatus, errorMessage string, opOutput fftypes.JSONObject) error
OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, status blockchain.TransactionStatus, errorMessage string, opOutput fftypes.JSONObject) error
BatchPinComplete(bi blockchain.Plugin, batch *blockchain.BatchPin, signingIdentity string) error
ContractEvent(event *blockchain.ContractEvent) error

Expand Down
45 changes: 33 additions & 12 deletions internal/events/operation_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,58 @@
package events

import (
"context"

"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)

func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error {
op, err := em.database.GetOperationByID(em.ctx, operationID)
func (em *eventManager) persistOpUpdate(ctx context.Context, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error {
op, err := em.database.GetOperationByID(ctx, operationID)
if err != nil || op == nil {
log.L(em.ctx).Warnf("Operation update '%s' ignored, as it was not submitted by this node", operationID)
log.L(ctx).Warnf("Operation update '%s' ignored, as it was not submitted by this node", operationID)
return nil
}

update := database.OperationQueryFactory.NewUpdate(em.ctx).
update := database.OperationQueryFactory.NewUpdate(ctx).
Set("status", txState).
Set("error", errorMessage).
Set("output", opOutput)
if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil {
if err := em.database.UpdateOperation(ctx, op.ID, update); err != nil {
return err
}

// Special handling for OpTypeTokenTransfer, which writes an event when it fails
if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed {
txUpdate := database.TransactionQueryFactory.NewUpdate(em.ctx).Set("status", txState)
if err := em.database.UpdateTransaction(em.ctx, op.Transaction, txUpdate); err != nil {
return err
// Special handling for operations that have cascading effects on other objects
switch op.Type {
case fftypes.OpTypeTokenTransfer:
if txState == fftypes.OpStatusFailed {
txUpdate := database.TransactionQueryFactory.NewUpdate(ctx).Set("status", txState)
if err := em.database.UpdateTransaction(ctx, op.Transaction, txUpdate); err != nil {
return err
}

event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID)
if err := em.database.InsertEvent(ctx, event); err != nil {
return err
}
}

event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID)
if err := em.database.InsertEvent(em.ctx, event); err != nil {
case fftypes.OpTypeContractInvoke:
txUpdate := database.TransactionQueryFactory.NewUpdate(ctx).Set("status", txState)
if err := em.database.UpdateTransaction(ctx, op.Transaction, txUpdate); err != nil {
return err
}
}

return nil
}

func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, status fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error {
return em.retry.Do(em.ctx, "persist operation status", func(attempt int) (bool, error) {
err := em.database.RunAsGroup(em.ctx, func(ctx context.Context) error {
return em.persistOpUpdate(ctx, operationID, status, errorMessage, opOutput)
})
return err != nil, err
})
}
75 changes: 44 additions & 31 deletions internal/events/operation_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package events

import (
"context"
"fmt"
"testing"

Expand All @@ -34,8 +35,8 @@ func TestOperationUpdateSuccess(t *testing.T) {
mbi := &blockchainmocks.Plugin{}

opID := fftypes.NewUUID()
mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID}, nil)
mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil)
mdi.On("GetOperationByID", mock.Anything, opID).Return(&fftypes.Operation{ID: opID}, nil)
mdi.On("UpdateOperation", mock.Anything, opID, mock.Anything).Return(nil)

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info)
Expand All @@ -49,111 +50,123 @@ func TestOperationUpdateNotFound(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)
mbi := &blockchainmocks.Plugin{}

opID := fftypes.NewUUID()
mdi.On("GetOperationByID", em.ctx, opID).Return(nil, fmt.Errorf("pop"))
mdi.On("GetOperationByID", mock.Anything, opID).Return(nil, fmt.Errorf("pop"))

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info)
err := em.persistOpUpdate(context.Background(), opID, fftypes.OpStatusFailed, "some error", info)
assert.NoError(t, err) // swallowed after logging

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}

func TestOperationUpdateError(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)
mbi := &blockchainmocks.Plugin{}

opID := fftypes.NewUUID()
mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID}, nil)
mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(fmt.Errorf("pop"))
mdi.On("GetOperationByID", mock.Anything, opID).Return(&fftypes.Operation{ID: opID}, nil)
mdi.On("UpdateOperation", mock.Anything, opID, mock.Anything).Return(fmt.Errorf("pop"))

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info)
err := em.persistOpUpdate(context.Background(), opID, fftypes.OpStatusFailed, "some error", info)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}

func TestOperationUpdateTransferFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)
mbi := &blockchainmocks.Plugin{}

op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Type: fftypes.OpTypeTokenTransfer,
Namespace: "ns1",
}

mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil)
mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool {
mdi.On("GetOperationByID", mock.Anything, op.ID).Return(op, nil)
mdi.On("UpdateOperation", mock.Anything, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", mock.Anything, op.Transaction, mock.Anything).Return(nil)
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *fftypes.Event) bool {
return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1"
})).Return(nil)

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info)
err := em.persistOpUpdate(context.Background(), op.ID, fftypes.OpStatusFailed, "some error", info)
assert.NoError(t, err)

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}

func TestOperationUpdateTransferTransactionFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)
mbi := &blockchainmocks.Plugin{}

op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Type: fftypes.OpTypeTokenTransfer,
Namespace: "ns1",
}

mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil)
mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(fmt.Errorf("pop"))
mdi.On("GetOperationByID", mock.Anything, op.ID).Return(op, nil)
mdi.On("UpdateOperation", mock.Anything, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", mock.Anything, op.Transaction, mock.Anything).Return(fmt.Errorf("pop"))

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info)
err := em.persistOpUpdate(context.Background(), op.ID, fftypes.OpStatusFailed, "some error", info)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}

func TestOperationUpdateTransferEventFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)
mbi := &blockchainmocks.Plugin{}

op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Type: fftypes.OpTypeTokenTransfer,
Namespace: "ns1",
}

mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil)
mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool {
mdi.On("GetOperationByID", mock.Anything, op.ID).Return(op, nil)
mdi.On("UpdateOperation", mock.Anything, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", mock.Anything, op.Transaction, mock.Anything).Return(nil)
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *fftypes.Event) bool {
return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1"
})).Return(fmt.Errorf("pop"))

info := fftypes.JSONObject{"some": "info"}
err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info)
err := em.persistOpUpdate(context.Background(), op.ID, fftypes.OpStatusFailed, "some error", info)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
}

func TestOperationUpdateContractInvokeTXFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
mdi := em.database.(*databasemocks.Plugin)

op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Type: fftypes.OpTypeContractInvoke,
Namespace: "ns1",
}

mdi.On("GetOperationByID", mock.Anything, op.ID).Return(op, nil)
mdi.On("UpdateOperation", mock.Anything, op.ID, mock.Anything).Return(nil)
mdi.On("UpdateTransaction", mock.Anything, op.Transaction, mock.Anything).Return(fmt.Errorf("pop"))

info := fftypes.JSONObject{"some": "info"}
err := em.persistOpUpdate(context.Background(), op.ID, fftypes.OpStatusFailed, "some error", info)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}