Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into reverted
Browse files Browse the repository at this point in the history
  • Loading branch information
awrichar committed Apr 4, 2023
2 parents 9aa6151 + a7226c9 commit c4d7c22
Show file tree
Hide file tree
Showing 31 changed files with 1,173 additions and 99 deletions.
12 changes: 11 additions & 1 deletion docs/reference/types/_includes/operation_description.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,14 @@ The diagram below shows the different types of operation that are performed by e
FireFly plugin type. The color coding (and numbers) map those different types of operation
to the [Transaction](./transaction.html) types that include those operations.

[![FireFly operations by transaction type](../../images/operations_by_transaction_type.jpg)](../../images/operations_by_transaction_type.jpg)
[![FireFly operations by transaction type](../../images/operations_by_transaction_type.jpg)](../../images/operations_by_transaction_type.jpg)

### Operation status

When initially created an operation is in `Initialized` state. Once the operation has been successfully sent to its respective plugin to be processed its
status moves to `Pending` state. This indicates that the plugin is processing the operation. The operation will then move to `Succeeded` or `Failed`
state depending on the outcome.

In the event that an operation could not be submitted to the plugin for processing, for example because the plugin's microservice was temporarily
unavailable, the operation will remain in `Initialized` state. Re-submitting the same FireFly API call using the same idempotency key will cause FireFly
to re-submit the operation to its plugin.
31 changes: 25 additions & 6 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/database/sqlcommon"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/core"
Expand Down Expand Up @@ -76,10 +77,16 @@ func (am *assetManager) TokenApproval(ctx context.Context, approval *core.TokenA

func (s *approveSender) resolveAndSend(ctx context.Context, method sendMethod) (err error) {
if !s.resolved {
if err = s.resolve(ctx); err != nil {
var opResubmit bool
if opResubmit, err = s.resolve(ctx); err != nil {
return err
}
s.resolved = true
if opResubmit {
// Operation had already been created on a previous call but never got submitted. We've resubmitted
// it now so no need to carry on
return nil
}
}

if method == methodSendAndWait && s.approval.Message != nil {
Expand All @@ -94,11 +101,23 @@ func (s *approveSender) resolveAndSend(ctx context.Context, method sendMethod) (
return s.sendInternal(ctx, method)
}

func (s *approveSender) resolve(ctx context.Context) (err error) {
func (s *approveSender) resolve(ctx context.Context) (opResubmitted bool, err error) {
// Create a transaction and attach to the approval
txid, err := s.mgr.txHelper.SubmitNewTransaction(ctx, core.TransactionTypeTokenApproval, s.approval.IdempotencyKey)
if err != nil {
return err
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
operation, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
err = resubmitErr
} else if operation != nil {
// We successfully resubmitted an initialized operation, return 2xx not 409
return true, nil
}
}
return false, err
}
s.approval.TX.ID = txid
s.approval.TX.Type = core.TransactionTypeTokenApproval
Expand All @@ -111,15 +130,15 @@ func (s *approveSender) resolve(ctx context.Context) (err error) {
}
s.msgSender, err = s.buildApprovalMessage(ctx, s.approval.Message)
if err != nil {
return err
return false, err
}
if err = s.msgSender.Prepare(ctx); err != nil {
return err
return false, err
}
s.approval.TokenApproval.Message = s.approval.Message.Header.ID
s.approval.TokenApproval.MessageHash = s.approval.Message.Hash
}
return nil
return false, err
}

func (s *approveSender) sendInternal(ctx context.Context, method sendMethod) (err error) {
Expand Down
103 changes: 103 additions & 0 deletions internal/assets/token_approval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/database/sqlcommon"
"github.com/hyperledger/firefly/internal/identity"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
Expand Down Expand Up @@ -216,6 +219,106 @@ func TestApprovalDefaultPoolSuccess(t *testing.T) {
mom.AssertExpectations(t)
}

func TestApprovalIdempotentOperationResubmit(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
var id = fftypes.NewUUID()

approval := &core.TokenApprovalInput{
TokenApproval: core.TokenApproval{
Approved: true,
Operator: "operator",
Key: "key",
},
IdempotencyKey: "idem1",
}

op := &core.Operation{}

mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
fb := database.TokenPoolQueryFactory.NewFilter(context.Background())
f := fb.And()
f.Limit(1).Count(true)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(op, nil)

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.TokenApproval(context.Background(), approval, false)
assert.NoError(t, err)

mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestApprovalIdempotentNoOperationToResubmit(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
var id = fftypes.NewUUID()

approval := &core.TokenApprovalInput{
TokenApproval: core.TokenApproval{
Approved: true,
Operator: "operator",
Key: "key",
},
IdempotencyKey: "idem1",
}

mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
fb := database.TokenPoolQueryFactory.NewFilter(context.Background())
f := fb.And()
f.Limit(1).Count(true)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, nil)

// If ResubmitOperations returns nil it's because there was no operation in initialized state, so we expect the regular 409 error back
_, err := am.TokenApproval(context.Background(), approval, false)
assert.Error(t, err)
assert.ErrorContains(t, err, "FF10431")

mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestApprovalIdempotentOperationErrorOnResubmit(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
var id = fftypes.NewUUID()

approval := &core.TokenApprovalInput{
TokenApproval: core.TokenApproval{
Approved: true,
Operator: "operator",
Key: "key",
},
IdempotencyKey: "idem1",
}

mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
fb := database.TokenPoolQueryFactory.NewFilter(context.Background())
f := fb.And()
f.Limit(1).Count(true)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, fmt.Errorf("pop"))

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.TokenApproval(context.Background(), approval, false)
assert.Error(t, err)
assert.ErrorContains(t, err, "pop")

mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestApprovalDefaultPoolNoPool(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
Expand Down
28 changes: 23 additions & 5 deletions internal/assets/token_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/database/sqlcommon"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/core"
)
Expand Down Expand Up @@ -75,31 +76,48 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *core.
})
}

var op *core.Operation
var newOperation *core.Operation
var resubmittedOperation *core.Operation
err = am.database.RunAsGroup(ctx, func(ctx context.Context) (err error) {
txid, err := am.txHelper.SubmitNewTransaction(ctx, core.TransactionTypeTokenPool, pool.IdempotencyKey)
if err != nil {
var resubmitErr error

// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers.
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmittedOperation, resubmitErr = am.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return resubmitErr
}
}
return err
}

pool.TX.ID = txid
pool.TX.Type = core.TransactionTypeTokenPool

op = core.NewOperation(
newOperation = core.NewOperation(
plugin,
am.namespace,
txid,
core.OpTypeTokenCreatePool)
if err = txcommon.AddTokenPoolCreateInputs(op, &pool.TokenPool); err == nil {
err = am.operations.AddOrReuseOperation(ctx, op)
if err = txcommon.AddTokenPoolCreateInputs(newOperation, &pool.TokenPool); err == nil {
err = am.operations.AddOrReuseOperation(ctx, newOperation)
}
return err
})
if resubmittedOperation != nil {
// We resubmitted a previously initialized operation, don't run a new one
return &pool.TokenPool, nil
}
if err != nil {
// Any other error? Return the error unchanged
return nil, err
}

_, err = am.operations.RunOperation(ctx, opCreatePool(op, &pool.TokenPool))
_, err = am.operations.RunOperation(ctx, opCreatePool(newOperation, &pool.TokenPool))
return &pool.TokenPool, err
}

Expand Down
Loading

0 comments on commit c4d7c22

Please sign in to comment.