diff --git a/internal/assets/token_pool.go b/internal/assets/token_pool.go index 37480afd93..fe26dd0b9f 100644 --- a/internal/assets/token_pool.go +++ b/internal/assets/token_pool.go @@ -90,10 +90,10 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp } if complete, err := plugin.CreateTokenPool(ctx, op.ID, pool); err != nil { + am.txHelper.WriteOperationFailure(ctx, op.ID, err) return nil, err } else if complete { - update := database.OperationQueryFactory.NewUpdate(ctx).Set("status", fftypes.OpStatusSucceeded) - return pool, am.database.UpdateOperation(ctx, op.ID, update) + am.txHelper.WriteOperationSuccess(ctx, op.ID, nil) } return pool, nil } @@ -114,10 +114,10 @@ func (am *assetManager) ActivateTokenPool(ctx context.Context, pool *fftypes.Tok } if complete, err := plugin.ActivateTokenPool(ctx, op.ID, pool, event); err != nil { + am.txHelper.WriteOperationFailure(ctx, op.ID, err) return err } else if complete { - update := database.OperationQueryFactory.NewUpdate(ctx).Set("status", fftypes.OpStatusSucceeded) - return am.database.UpdateOperation(ctx, op.ID, update) + am.txHelper.WriteOperationSuccess(ctx, op.ID, nil) } return nil } diff --git a/internal/assets/token_pool_test.go b/internal/assets/token_pool_test.go index a2568519b7..4cc7fc2615 100644 --- a/internal/assets/token_pool_test.go +++ b/internal/assets/token_pool_test.go @@ -210,6 +210,7 @@ func TestCreateTokenPoolFail(t *testing.T) { mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenPool).Return(fftypes.NewUUID(), nil) mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(false, fmt.Errorf("pop")) + mth.On("WriteOperationFailure", context.Background(), mock.Anything, fmt.Errorf("pop")) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) assert.Regexp(t, "pop", err) @@ -257,7 +258,7 @@ func TestCreateTokenPoolOpInsertFail(t *testing.T) { assert.Regexp(t, "pop", err) } -func TestCreateTokenPoolOpUpdateFail(t *testing.T) { +func TestCreateTokenPoolSyncSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -276,13 +277,13 @@ func TestCreateTokenPoolOpUpdateFail(t *testing.T) { mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenPool).Return(fftypes.NewUUID(), nil) mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - mdi.On("UpdateOperation", context.Background(), mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + mth.On("WriteOperationSuccess", context.Background(), mock.Anything, mock.Anything) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) - assert.Regexp(t, "pop", err) + assert.NoError(t, err) } -func TestCreateTokenPoolSuccess(t *testing.T) { +func TestCreateTokenPoolAsyncSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -411,17 +412,19 @@ func TestActivateTokenPoolFail(t *testing.T) { mdm := am.data.(*datamocks.Manager) mdi := am.database.(*databasemocks.Plugin) mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mth := am.txHelper.(*txcommonmocks.Helper) mdm.On("VerifyNamespaceExists", context.Background(), "ns1").Return(nil) mdi.On("InsertOperation", context.Background(), mock.MatchedBy(func(op *fftypes.Operation) bool { return op.Type == fftypes.OpTypeTokenActivatePool })).Return(nil) mti.On("ActivateTokenPool", context.Background(), mock.Anything, pool, ev).Return(false, fmt.Errorf("pop")) + mth.On("WriteOperationFailure", context.Background(), mock.Anything, fmt.Errorf("pop")) err := am.ActivateTokenPool(context.Background(), pool, ev) assert.EqualError(t, err, "pop") } -func TestActivateTokenPoolOpUpdateFail(t *testing.T) { +func TestActivateTokenPoolSyncSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -434,15 +437,16 @@ func TestActivateTokenPoolOpUpdateFail(t *testing.T) { mdm := am.data.(*datamocks.Manager) mdi := am.database.(*databasemocks.Plugin) mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mth := am.txHelper.(*txcommonmocks.Helper) mdm.On("VerifyNamespaceExists", context.Background(), "ns1").Return(nil) mdi.On("InsertOperation", context.Background(), mock.MatchedBy(func(op *fftypes.Operation) bool { return op.Type == fftypes.OpTypeTokenActivatePool })).Return(nil) mti.On("ActivateTokenPool", context.Background(), mock.Anything, pool, ev).Return(true, nil) - mdi.On("UpdateOperation", context.Background(), mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + mth.On("WriteOperationSuccess", context.Background(), mock.Anything, mock.Anything) err := am.ActivateTokenPool(context.Background(), pool, ev) - assert.EqualError(t, err, "pop") + assert.NoError(t, err) } func TestGetTokenPool(t *testing.T) { diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index b6a7bc6bad..6c6f7e6447 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/hyperledger/firefly/internal/i18n" - "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" @@ -272,20 +271,9 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er panic(fmt.Sprintf("unknown transfer type: %v", s.transfer.Type)) } - // if transaction fails, mark op as failed in DB if err != nil { - _ = s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { - l := log.L(ctx) - update := database.OperationQueryFactory.NewUpdate(ctx). - Set("status", fftypes.OpStatusFailed) - if err = s.mgr.database.UpdateOperation(ctx, op.ID, update); err != nil { - l.Errorf("Operation update failed: %s update=[ %s ]", err, update) - } - - return nil - }) + s.mgr.txHelper.WriteOperationFailure(ctx, op.ID, err) } - return err } diff --git a/internal/assets/token_transfer_test.go b/internal/assets/token_transfer_test.go index 16d26647b2..df8b1a05b7 100644 --- a/internal/assets/token_transfer_test.go +++ b/internal/assets/token_transfer_test.go @@ -411,7 +411,7 @@ func TestMintTokensFail(t *testing.T) { mti.On("MintTokens", context.Background(), mock.Anything, "F1", &mint.TokenTransfer).Return(fmt.Errorf("pop")) mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenTransfer).Return(fftypes.NewUUID(), nil) mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) - mdi.On("UpdateOperation", context.Background(), mock.Anything, mock.Anything).Return(nil) + mth.On("WriteOperationFailure", context.Background(), mock.Anything, fmt.Errorf("pop")) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.EqualError(t, err, "pop") @@ -441,7 +441,7 @@ func TestMintTokensFailAndDbFail(t *testing.T) { mti.On("MintTokens", context.Background(), mock.Anything, "F1", &mint.TokenTransfer).Return(fmt.Errorf("pop")) mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenTransfer).Return(fftypes.NewUUID(), nil) - mdi.On("UpdateOperation", context.Background(), mock.Anything, mock.Anything).Return(fmt.Errorf("Update fail")) + mth.On("WriteOperationFailure", context.Background(), mock.Anything, fmt.Errorf("pop")) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.EqualError(t, err, "pop") diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 6e71491e36..16ed940cc3 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -163,6 +163,21 @@ func (cm *contractManager) GetFFIs(ctx context.Context, ns string, filter databa return cm.database.GetFFIs(ctx, ns, filter) } +func (cm *contractManager) writeInvokeTransaction(ctx context.Context, ns string, input fftypes.JSONObject) (*fftypes.Operation, error) { + txid, err := cm.txHelper.SubmitNewTransaction(ctx, ns, fftypes.TransactionTypeContractInvoke) + if err != nil { + return nil, err + } + + op := fftypes.NewOperation( + cm.blockchain, + ns, + txid, + fftypes.OpTypeBlockchainInvoke) + op.Input = input + return op, cm.database.InsertOperation(ctx, op) +} + func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *fftypes.ContractCallRequest) (res interface{}, err error) { req.Key, err = cm.identity.ResolveSigningKey(ctx, req.Key) if err != nil { @@ -177,19 +192,13 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f if err := cm.validateInvokeContractRequest(ctx, req); err != nil { return err } - - txid, err := cm.txHelper.SubmitNewTransaction(ctx, ns, fftypes.TransactionTypeContractInvoke) - if err != nil { - return err + if req.Type == fftypes.CallTypeInvoke { + op, err = cm.writeInvokeTransaction(ctx, ns, req.Input) + if err != nil { + return err + } } - - op = fftypes.NewOperation( - cm.blockchain, - ns, - txid, - fftypes.OpTypeBlockchainInvoke) - op.Input = req.Input - return cm.database.InsertOperation(ctx, op) + return nil }) if err != nil { return nil, err @@ -197,13 +206,17 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f switch req.Type { case fftypes.CallTypeInvoke: - operationID := fftypes.NewUUID() - return cm.blockchain.InvokeContract(ctx, operationID, req.Key, req.Location, req.Method, req.Input) + res, err = cm.blockchain.InvokeContract(ctx, op.ID, req.Key, req.Location, req.Method, req.Input) case fftypes.CallTypeQuery: - return cm.blockchain.QueryContract(ctx, req.Location, req.Method, req.Input) + res, err = cm.blockchain.QueryContract(ctx, req.Location, req.Method, req.Input) default: panic(fmt.Sprintf("unknown call type: %s", req.Type)) } + + if op != nil && err != nil { + cm.txHelper.WriteOperationFailure(ctx, op.ID, err) + } + return res, err } func (cm *contractManager) InvokeContractAPI(ctx context.Context, ns, apiName, methodPath string, req *fftypes.ContractCallRequest) (interface{}, error) { diff --git a/internal/contracts/manager_test.go b/internal/contracts/manager_test.go index faee5e9f45..477cf498fc 100644 --- a/internal/contracts/manager_test.go +++ b/internal/contracts/manager_test.go @@ -1004,6 +1004,41 @@ func TestInvokeContract(t *testing.T) { mth.AssertExpectations(t) } +func TestInvokeContractFail(t *testing.T) { + cm := newTestContractManager() + mbi := cm.blockchain.(*blockchainmocks.Plugin) + mim := cm.identity.(*identitymanagermocks.Manager) + mdi := cm.database.(*databasemocks.Plugin) + mth := cm.txHelper.(*txcommonmocks.Helper) + + req := &fftypes.ContractCallRequest{ + Type: fftypes.CallTypeInvoke, + Interface: fftypes.NewUUID(), + Ledger: fftypes.JSONAnyPtr(""), + Location: fftypes.JSONAnyPtr(""), + Method: &fftypes.FFIMethod{ + Name: "doStuff", + ID: fftypes.NewUUID(), + Params: fftypes.FFIParams{}, + Returns: fftypes.FFIParams{}, + }, + } + + mth.On("SubmitNewTransaction", mock.Anything, "ns1", fftypes.TransactionTypeContractInvoke).Return(fftypes.NewUUID(), nil) + + mim.On("ResolveSigningKey", mock.Anything, "").Return("key-resolved", nil) + mdi.On("InsertOperation", mock.Anything, mock.MatchedBy(func(op *fftypes.Operation) bool { + return op.Namespace == "ns1" && op.Type == fftypes.OpTypeBlockchainInvoke && op.Plugin == "mockblockchain" + })).Return(nil) + mbi.On("InvokeContract", mock.Anything, mock.AnythingOfType("*fftypes.UUID"), "key-resolved", req.Location, req.Method, req.Input).Return(nil, fmt.Errorf("pop")) + mth.On("WriteOperationFailure", mock.Anything, mock.Anything, fmt.Errorf("pop")) + + _, err := cm.InvokeContract(context.Background(), "ns1", req) + + assert.EqualError(t, err, "pop") + mth.AssertExpectations(t) +} + func TestInvokeContractFailResolveSigningKey(t *testing.T) { cm := newTestContractManager() mim := cm.identity.(*identitymanagermocks.Manager) diff --git a/internal/database/sqlcommon/operation_sql.go b/internal/database/sqlcommon/operation_sql.go index 93112ae9b4..cd148792d1 100644 --- a/internal/database/sqlcommon/operation_sql.go +++ b/internal/database/sqlcommon/operation_sql.go @@ -152,7 +152,7 @@ func (s *SQLCommon) GetOperations(ctx context.Context, filter database.Filter) ( return ops, s.queryRes(ctx, tx, "operations", fop, fi), err } -func (s *SQLCommon) UpdateOperation(ctx context.Context, id *fftypes.UUID, update database.Update) (err error) { +func (s *SQLCommon) updateOperation(ctx context.Context, id *fftypes.UUID, update database.Update) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { @@ -174,3 +174,13 @@ func (s *SQLCommon) UpdateOperation(ctx context.Context, id *fftypes.UUID, updat return s.commitTx(ctx, tx, autoCommit) } + +func (s *SQLCommon) ResolveOperation(ctx context.Context, id *fftypes.UUID, status fftypes.OpStatus, errorMsg string, output fftypes.JSONObject) (err error) { + update := database.OperationQueryFactory.NewUpdate(ctx). + Set("status", status). + Set("error", errorMsg) + if output != nil { + update.Set("output", output) + } + return s.updateOperation(ctx, id, update) +} diff --git a/internal/database/sqlcommon/operation_sql_test.go b/internal/database/sqlcommon/operation_sql_test.go index f3c1b0ef60..27b4fdc1ac 100644 --- a/internal/database/sqlcommon/operation_sql_test.go +++ b/internal/database/sqlcommon/operation_sql_test.go @@ -90,12 +90,7 @@ func TestOperationE2EWithDB(t *testing.T) { assert.Equal(t, 0, len(operations)) // Update - updateTime := fftypes.Now() - up := database.OperationQueryFactory.NewUpdate(ctx). - Set("status", fftypes.OpStatusSucceeded). - Set("updated", updateTime). - Set("error", "") - err = s.UpdateOperation(ctx, operation.ID, up) + err = s.ResolveOperation(ctx, operation.ID, fftypes.OpStatusSucceeded, "", fftypes.JSONObject{"extra": "info"}) assert.NoError(t, err) // Test find updated value @@ -198,7 +193,7 @@ func TestOperationUpdateBeginFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) u := database.OperationQueryFactory.NewUpdate(context.Background()).Set("id", fftypes.NewUUID()) - err := s.UpdateOperation(context.Background(), fftypes.NewUUID(), u) + err := s.updateOperation(context.Background(), fftypes.NewUUID(), u) assert.Regexp(t, "FF10114", err) } @@ -206,7 +201,7 @@ func TestOperationUpdateBuildQueryFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() u := database.OperationQueryFactory.NewUpdate(context.Background()).Set("id", map[bool]bool{true: false}) - err := s.UpdateOperation(context.Background(), fftypes.NewUUID(), u) + err := s.updateOperation(context.Background(), fftypes.NewUUID(), u) assert.Regexp(t, "FF10149.*id", err) } @@ -216,6 +211,6 @@ func TestOperationUpdateFail(t *testing.T) { mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() u := database.OperationQueryFactory.NewUpdate(context.Background()).Set("id", fftypes.NewUUID()) - err := s.UpdateOperation(context.Background(), fftypes.NewUUID(), u) + err := s.updateOperation(context.Background(), fftypes.NewUUID(), u) assert.Regexp(t, "FF10117", err) } diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 5dabb4907a..71ac6efc65 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -269,11 +269,9 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string } } - update := database.OperationQueryFactory.NewUpdate(em.ctx). - Set("status", status). - Set("error", update.Error). - Set("output", update.Info) // Note that we don't need the manifest to be kept here, as it's already in the input - if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { + // Resolve the operation + // Note that we don't need the manifest to be kept here, as it's already in the input + if err := em.database.ResolveOperation(em.ctx, op.ID, status, update.Error, update.Info); err != nil { return true, err // this is always retryable } return false, nil diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index 3a298cf999..2334d22a76 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -19,6 +19,7 @@ package events import ( "encoding/json" "fmt" + "strings" "testing" "github.com/hyperledger/firefly/mocks/databasemocks" @@ -492,7 +493,9 @@ func TestTransferResultOk(t *testing.T) { ID: id, }, }, nil, nil) - mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, id, fftypes.OpStatusFailed, "error info", fftypes.JSONObject{ + "extra": "info", + }).Return(nil) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") @@ -501,7 +504,6 @@ func TestTransferResultOk(t *testing.T) { Info: fftypes.JSONObject{"extra": "info"}, }) assert.NoError(t, err) - } func TestTransferResultManifestMismatch(t *testing.T) { @@ -518,7 +520,11 @@ func TestTransferResultManifestMismatch(t *testing.T) { }, }, }, nil, nil) - mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, id, fftypes.OpStatusFailed, mock.MatchedBy(func(errorMsg string) bool { + return strings.Contains(errorMsg, "FF10329") + }), fftypes.JSONObject{ + "extra": "info", + }).Return(nil) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") @@ -538,9 +544,7 @@ func TestTransferResultNotCorrelated(t *testing.T) { defer cancel() mdi := em.database.(*databasemocks.Plugin) - id := fftypes.NewUUID() mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{}, nil, nil) - mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(nil) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") @@ -597,7 +601,9 @@ func TestTransferUpdateFail(t *testing.T) { ID: id, }, }, nil, nil) - mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(fmt.Errorf("pop")) + mdi.On("ResolveOperation", mock.Anything, id, fftypes.OpStatusFailed, "error info", fftypes.JSONObject{ + "extra": "info", + }).Return(fmt.Errorf("pop")) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 5d6f18be49..df2981dd0c 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -20,7 +20,6 @@ import ( "context" "github.com/hyperledger/firefly/internal/log" - "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -31,11 +30,7 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft return nil } - update := database.OperationQueryFactory.NewUpdate(ctx). - Set("status", txState). - Set("error", errorMessage). - Set("output", opOutput) - if err := em.database.UpdateOperation(ctx, op.ID, update); err != nil { + if err := em.database.ResolveOperation(ctx, op.ID, txState, errorMessage, opOutput); err != nil { return err } diff --git a/internal/events/operation_update_test.go b/internal/events/operation_update_test.go index a82c3c673c..c4f5242133 100644 --- a/internal/events/operation_update_test.go +++ b/internal/events/operation_update_test.go @@ -38,14 +38,14 @@ func TestOperationUpdateSuccess(t *testing.T) { opID := fftypes.NewUUID() txid := fftypes.NewUUID() + info := fftypes.JSONObject{"some": "info"} mdi.On("RunAsGroup", em.ctx, mock.Anything).Run(func(args mock.Arguments) { args[1].(func(ctx context.Context) error)(em.ctx) }).Return(nil) mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID, Transaction: txid}, nil) - mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, opID, fftypes.OpStatusFailed, "some error", info).Return(nil) mth.On("AddBlockchainTX", mock.Anything, txid, "0x12345").Return(nil) - info := fftypes.JSONObject{"some": "info"} err := em.OperationUpdate(mdi, opID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.NoError(t, err) @@ -78,10 +78,10 @@ func TestOperationUpdateError(t *testing.T) { opID := fftypes.NewUUID() txid := fftypes.NewUUID() + info := fftypes.JSONObject{"some": "info"} mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID, Transaction: txid}, nil) - mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(fmt.Errorf("pop")) + mdi.On("ResolveOperation", mock.Anything, opID, fftypes.OpStatusFailed, "some error", info).Return(fmt.Errorf("pop")) - info := fftypes.JSONObject{"some": "info"} err := em.operationUpdateCtx(em.ctx, opID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.EqualError(t, err, "pop") @@ -98,11 +98,11 @@ func TestOperationTXUpdateError(t *testing.T) { opID := fftypes.NewUUID() txid := fftypes.NewUUID() + info := fftypes.JSONObject{"some": "info"} mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID, Transaction: txid}, nil) - mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, opID, fftypes.OpStatusFailed, "some error", info).Return(nil) mth.On("AddBlockchainTX", mock.Anything, txid, "0x12345").Return(fmt.Errorf("pop")) - info := fftypes.JSONObject{"some": "info"} err := em.operationUpdateCtx(em.ctx, opID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.EqualError(t, err, "pop") @@ -123,15 +123,15 @@ func TestOperationUpdateTransferFail(t *testing.T) { Namespace: "ns1", Transaction: fftypes.NewUUID(), } + info := fftypes.JSONObject{"some": "info"} mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) - mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, op.ID, fftypes.OpStatusFailed, "some error", info).Return(nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" })).Return(nil) mth.On("AddBlockchainTX", mock.Anything, op.Transaction, "0x12345").Return(nil) - info := fftypes.JSONObject{"some": "info"} err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.NoError(t, err) @@ -152,13 +152,13 @@ func TestOperationUpdateTransferTransactionFail(t *testing.T) { Namespace: "ns1", Transaction: fftypes.NewUUID(), } + info := fftypes.JSONObject{"some": "info"} mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) - mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, op.ID, fftypes.OpStatusFailed, "some error", info).Return(nil) mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil) mth.On("AddBlockchainTX", mock.Anything, op.Transaction, "0x12345").Return(fmt.Errorf("pop")) - info := fftypes.JSONObject{"some": "info"} err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.EqualError(t, err, "pop") @@ -177,14 +177,14 @@ func TestOperationUpdateTransferEventFail(t *testing.T) { Type: fftypes.OpTypeTokenTransfer, Namespace: "ns1", } + info := fftypes.JSONObject{"some": "info"} mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) - mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) + mdi.On("ResolveOperation", mock.Anything, op.ID, fftypes.OpStatusFailed, "some error", info).Return(nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" })).Return(fmt.Errorf("pop")) - info := fftypes.JSONObject{"some": "info"} err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "0x12345", "some error", info) assert.EqualError(t, err, "pop") diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index 9822c8e6e7..0a9330c175 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -45,6 +45,13 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *fftypes.TokenPool if err := em.persistBlockchainEvent(ctx, chainEvent); err != nil { return err } + if op, err := em.findTXOperation(ctx, pool.TX.ID, fftypes.OpTypeTokenActivatePool); err != nil { + return err + } else if op == nil { + log.L(ctx).Warnf("No activate operation found for token pool transaction=%s", pool.TX.ID) + } else if err := em.database.ResolveOperation(ctx, op.ID, fftypes.OpStatusSucceeded, "", nil); err != nil { + return err + } if _, err := em.txHelper.PersistTransaction(ctx, pool.Namespace, pool.TX.ID, pool.TX.Type, blockchainTXID); err != nil { return err } @@ -57,12 +64,12 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *fftypes.TokenPool return em.database.InsertEvent(ctx, event) } -func (em *eventManager) findTokenPoolCreateOp(ctx context.Context, tx *fftypes.UUID) (*fftypes.Operation, error) { +func (em *eventManager) findTXOperation(ctx context.Context, tx *fftypes.UUID, opType fftypes.OpType) (*fftypes.Operation, error) { // Find a matching operation within this transaction fb := database.OperationQueryFactory.NewFilter(ctx) filter := fb.And( fb.Eq("tx", tx), - fb.Eq("type", fftypes.OpTypeTokenCreatePool), + fb.Eq("type", opType), ) if operations, _, err := em.database.GetOperations(ctx, filter); err != nil { return nil, err @@ -92,7 +99,7 @@ func (em *eventManager) shouldConfirm(ctx context.Context, pool *tokens.TokenPoo } func (em *eventManager) shouldAnnounce(ctx context.Context, pool *tokens.TokenPool) (announcePool *fftypes.TokenPool, err error) { - op, err := em.findTokenPoolCreateOp(ctx, pool.TransactionID) + op, err := em.findTXOperation(ctx, pool.TransactionID, fftypes.OpTypeTokenCreatePool) if err != nil { return nil, err } else if op == nil { diff --git a/internal/events/token_pool_created_test.go b/internal/events/token_pool_created_test.go index d58bc853d8..e02c074ba3 100644 --- a/internal/events/token_pool_created_test.go +++ b/internal/events/token_pool_created_test.go @@ -94,6 +94,7 @@ func TestTokenPoolCreatedConfirm(t *testing.T) { mti := &tokenmocks.Plugin{} mth := em.txHelper.(*txcommonmocks.Helper) + opID := fftypes.NewUUID() txID := fftypes.NewUUID() info := fftypes.JSONObject{"some": "info"} chainPool := &tokens.TokenPool{ @@ -130,6 +131,10 @@ func TestTokenPoolCreatedConfirm(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeBlockchainEvent })).Return(nil).Once() + mdi.On("GetOperations", em.ctx, mock.Anything).Return([]*fftypes.Operation{{ + ID: opID, + }}, nil, nil) + mdi.On("ResolveOperation", em.ctx, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil) mth.On("PersistTransaction", mock.Anything, "ns1", txID, fftypes.TransactionTypeTokenPool, "0xffffeeee").Return(true, nil).Once() mdi.On("UpsertTokenPool", em.ctx, storedPool).Return(nil).Once() mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { @@ -222,6 +227,7 @@ func TestTokenPoolCreatedMigrate(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeBlockchainEvent })).Return(nil).Once() + mdi.On("GetOperations", em.ctx, mock.Anything).Return([]*fftypes.Operation{}, nil, nil) mth.On("PersistTransaction", mock.Anything, "ns1", txID, fftypes.TransactionTypeTokenPool, "0xffffeeee").Return(true, nil).Once() mdi.On("UpsertTokenPool", em.ctx, storedPool).Return(nil).Once() mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { @@ -272,6 +278,78 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) { mdi.AssertExpectations(t) } +func TestConfirmPoolGetOpsFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + + txID := fftypes.NewUUID() + storedPool := &fftypes.TokenPool{ + Namespace: "ns1", + ID: fftypes.NewUUID(), + Key: "0x0", + State: fftypes.TokenPoolStatePending, + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: txID, + }, + } + event := &blockchain.Event{ + Name: "TokenPool", + } + + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Name == event.Name + })).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeBlockchainEvent + })).Return(nil) + mdi.On("GetOperations", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestConfirmPoolResolveOpFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + + opID := fftypes.NewUUID() + txID := fftypes.NewUUID() + storedPool := &fftypes.TokenPool{ + Namespace: "ns1", + ID: fftypes.NewUUID(), + Key: "0x0", + State: fftypes.TokenPoolStatePending, + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: txID, + }, + } + event := &blockchain.Event{ + Name: "TokenPool", + } + + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Name == event.Name + })).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeBlockchainEvent + })).Return(nil) + mdi.On("GetOperations", em.ctx, mock.Anything).Return([]*fftypes.Operation{{ + ID: opID, + }}, nil, nil) + mdi.On("ResolveOperation", em.ctx, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(fmt.Errorf("pop")) + + err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + func TestConfirmPoolTxFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -299,6 +377,7 @@ func TestConfirmPoolTxFail(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeBlockchainEvent })).Return(nil) + mdi.On("GetOperations", em.ctx, mock.Anything).Return([]*fftypes.Operation{}, nil, nil) mth.On("PersistTransaction", mock.Anything, "ns1", txID, fftypes.TransactionTypeTokenPool, "0xffffeeee").Return(false, fmt.Errorf("pop")) err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") @@ -334,6 +413,7 @@ func TestConfirmPoolUpsertFail(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeBlockchainEvent })).Return(nil) + mdi.On("GetOperations", em.ctx, mock.Anything).Return([]*fftypes.Operation{}, nil, nil) mth.On("PersistTransaction", mock.Anything, "ns1", txID, fftypes.TransactionTypeTokenPool, "0xffffeeee").Return(true, nil).Once() mdi.On("UpsertTokenPool", em.ctx, storedPool).Return(fmt.Errorf("pop")) diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index 28a10805a6..b2ad03856d 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -29,6 +29,8 @@ type Helper interface { SubmitNewTransaction(ctx context.Context, ns string, txType fftypes.TransactionType) (*fftypes.UUID, error) PersistTransaction(ctx context.Context, ns string, id *fftypes.UUID, txType fftypes.TransactionType, blockchainTXID string) (valid bool, err error) AddBlockchainTX(ctx context.Context, id *fftypes.UUID, blockchainTXID string) error + WriteOperationSuccess(ctx context.Context, opID *fftypes.UUID, output fftypes.JSONObject) + WriteOperationFailure(ctx context.Context, opID *fftypes.UUID, err error) } type transactionHelper struct { @@ -126,3 +128,15 @@ func (t *transactionHelper) AddBlockchainTX(ctx context.Context, id *fftypes.UUI return nil } + +func (t *transactionHelper) WriteOperationSuccess(ctx context.Context, opID *fftypes.UUID, output fftypes.JSONObject) { + if err2 := t.database.ResolveOperation(ctx, opID, fftypes.OpStatusSucceeded, "", output); err2 != nil { + log.L(ctx).Errorf("Failed to update operation %s: %s", opID, err2) + } +} + +func (t *transactionHelper) WriteOperationFailure(ctx context.Context, opID *fftypes.UUID, err error) { + if err2 := t.database.ResolveOperation(ctx, opID, fftypes.OpStatusFailed, err.Error(), nil); err2 != nil { + log.L(ctx).Errorf("Failed to update operation %s: %s", opID, err2) + } +} diff --git a/internal/txcommon/txcommon_test.go b/internal/txcommon/txcommon_test.go index c57f1945e8..e9982eb204 100644 --- a/internal/txcommon/txcommon_test.go +++ b/internal/txcommon/txcommon_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -375,3 +375,34 @@ func TestAddBlockchainTXUnchanged(t *testing.T) { mdi.AssertExpectations(t) } + +func TestWriteOperationSuccess(t *testing.T) { + + mdi := &databasemocks.Plugin{} + txHelper := NewTransactionHelper(mdi) + ctx := context.Background() + + opID := fftypes.NewUUID() + output := fftypes.JSONObject{"some": "info"} + mdi.On("ResolveOperation", ctx, opID, fftypes.OpStatusSucceeded, "", output).Return(fmt.Errorf("pop")) + + txHelper.WriteOperationSuccess(ctx, opID, output) + + mdi.AssertExpectations(t) + +} + +func TestWriteOperationFailure(t *testing.T) { + + mdi := &databasemocks.Plugin{} + txHelper := NewTransactionHelper(mdi) + ctx := context.Background() + + opID := fftypes.NewUUID() + mdi.On("ResolveOperation", ctx, opID, fftypes.OpStatusFailed, "pop", mock.Anything).Return(fmt.Errorf("pop")) + + txHelper.WriteOperationFailure(ctx, opID, fmt.Errorf("pop")) + + mdi.AssertExpectations(t) + +} diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index 846a4eefb4..17c1a0ec67 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -212,17 +212,3 @@ func (_m *Plugin) SubmitBatchPin(ctx context.Context, operationID *fftypes.UUID, return r0 } - -// ValidateContractLocation provides a mock function with given fields: ctx, location -func (_m *Plugin) ValidateContractLocation(ctx context.Context, location *fftypes.JSONAny) error { - ret := _m.Called(ctx, location) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.JSONAny) error); ok { - r0 = rf(ctx, location) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 4b180b74b2..6406b2613f 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2227,6 +2227,20 @@ func (_m *Plugin) ReplaceMessage(ctx context.Context, message *fftypes.Message) return r0 } +// ResolveOperation provides a mock function with given fields: ctx, id, status, errorMsg, output +func (_m *Plugin) ResolveOperation(ctx context.Context, id *fftypes.UUID, status fftypes.OpStatus, errorMsg string, output fftypes.JSONObject) error { + ret := _m.Called(ctx, id, status, errorMsg, output) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { + r0 = rf(ctx, id, status, errorMsg, output) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // RunAsGroup provides a mock function with given fields: ctx, fn func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error) error { ret := _m.Called(ctx, fn) @@ -2381,20 +2395,6 @@ func (_m *Plugin) UpdateOffset(ctx context.Context, rowID int64, update database return r0 } -// UpdateOperation provides a mock function with given fields: ctx, id, update -func (_m *Plugin) UpdateOperation(ctx context.Context, id *fftypes.UUID, update database.Update) error { - ret := _m.Called(ctx, id, update) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.UUID, database.Update) error); ok { - r0 = rf(ctx, id, update) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UpdateOrganization provides a mock function with given fields: ctx, id, update func (_m *Plugin) UpdateOrganization(ctx context.Context, id *fftypes.UUID, update database.Update) error { ret := _m.Called(ctx, id, update) diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index 131a9c10af..bfff72281f 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -71,3 +71,13 @@ func (_m *Helper) SubmitNewTransaction(ctx context.Context, ns string, txType ff return r0, r1 } + +// WriteOperationFailure provides a mock function with given fields: ctx, opID, err +func (_m *Helper) WriteOperationFailure(ctx context.Context, opID *fftypes.UUID, err error) { + _m.Called(ctx, opID, err) +} + +// WriteOperationSuccess provides a mock function with given fields: ctx, opID, output +func (_m *Helper) WriteOperationSuccess(ctx context.Context, opID *fftypes.UUID, output fftypes.JSONObject) { + _m.Called(ctx, opID, output) +} diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index fb283a0131..eba9bed8aa 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -53,8 +53,6 @@ type Plugin interface { // QueryContract executes a method via custom on-chain logic and returns the result QueryContract(ctx context.Context, location *fftypes.JSONAny, method *fftypes.FFIMethod, input map[string]interface{}) (interface{}, error) - ValidateContractLocation(ctx context.Context, location *fftypes.JSONAny) error - // AddSubscription adds a new subscription to a user-specified contract and event AddSubscription(ctx context.Context, subscription *fftypes.ContractSubscriptionInput) error diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 58e57da03c..52955e7d19 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -197,8 +197,8 @@ type iOperationCollection interface { // InsertOperation - Insert an operation InsertOperation(ctx context.Context, operation *fftypes.Operation) (err error) - // UpdateOperation - Update operation by ID - UpdateOperation(ctx context.Context, id *fftypes.UUID, update Update) (err error) + // ResolveOperation - Resolve operation upon completion + ResolveOperation(ctx context.Context, id *fftypes.UUID, status fftypes.OpStatus, errorMsg string, output fftypes.JSONObject) (err error) // GetOperationByID - Get an operation by ID GetOperationByID(ctx context.Context, id *fftypes.UUID) (operation *fftypes.Operation, err error) diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index 265e8735d9..4fa6354d68 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -58,6 +58,7 @@ type Named interface { // NewOperation creates a new operation in a transaction func NewOperation(plugin Named, namespace string, tx *UUID, opType OpType) *Operation { + now := Now() return &Operation{ ID: NewUUID(), Namespace: namespace, @@ -65,7 +66,8 @@ func NewOperation(plugin Named, namespace string, tx *UUID, opType OpType) *Oper Transaction: tx, Type: opType, Status: OpStatusPending, - Created: Now(), + Created: now, + Updated: now, } } diff --git a/pkg/fftypes/operation_test.go b/pkg/fftypes/operation_test.go index 82b01b0f0e..56220abf06 100644 --- a/pkg/fftypes/operation_test.go +++ b/pkg/fftypes/operation_test.go @@ -38,5 +38,6 @@ func TestNewPendingMessageOp(t *testing.T) { Type: OpTypePublicStorageBatchBroadcast, Status: OpStatusPending, Created: op.Created, + Updated: op.Created, }, *op) } diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 706b0b2e65..28f824d08e 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -63,29 +63,28 @@ type Plugin interface { // has completed. However, it does not matter if these events are workload balance between the firefly core // cluster instances of the node. type Callbacks interface { - // TokensOpUpdate notifies firefly of an update to this plugin's operation within a transaction. + // TokenOpUpdate notifies firefly of an update to this plugin's operation within a transaction. // Only success/failure and errorMessage (for errors) are modeled. // opOutput can be used to add opaque protocol specific JSON from the plugin (protocol transaction ID etc.) // Note this is an optional hook information, and stored separately to the confirmation of the actual event that was being submitted/sequenced. // Only the party submitting the transaction will see this data. // - // Error should will only be returned in shutdown scenarios + // Error should only be returned in shutdown scenarios TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error // TokenPoolCreated notifies on the creation of a new token pool, which might have been // submitted by us, or by any other authorized party in the network. // - // Error should will only be returned in shutdown scenarios + // Error should only be returned in shutdown scenarios TokenPoolCreated(plugin Plugin, pool *TokenPool) error // TokensTransferred notifies on a transfer between token accounts. // - // Error should will only be returned in shutdown scenarios + // Error should only be returned in shutdown scenarios TokensTransferred(plugin Plugin, transfer *TokenTransfer) error } -// Capabilities the supported featureset of the tokens -// interface implemented by the plugin, with the specified config +// Capabilities is the supported featureset of the tokens interface implemented by the plugin, with the specified config type Capabilities struct { } @@ -112,6 +111,8 @@ type TokenPool struct { } type TokenTransfer struct { + // Although not every field will be filled in, embed fftypes.TokenTransfer to avoid duplicating lots of fields + // Notable fields NOT expected to be populated by plugins: Namespace, LocalID, Pool fftypes.TokenTransfer // PoolProtocolID is the ID assigned to the token pool by the connector