Skip to content

Commit

Permalink
Add test for processing stalled
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-ogrady committed Oct 5, 2020
1 parent 1df90e3 commit c8cb568
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 19 deletions.
32 changes: 16 additions & 16 deletions constructor/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,24 @@ func (c *Coordinator) findJob(
return nil, ErrReturnFundsComplete
}

processing, err := c.storage.Processing(ctx, dbTx, string(job.RequestFunds))
if err != nil {
return nil, fmt.Errorf(
"%w: %s",
ErrJobsUnretrievable,
err.Error(),
)
}
if c.returnFundsWorkflow != nil {
processing, err := c.storage.Processing(ctx, dbTx, string(job.RequestFunds))
if err != nil {
return nil, fmt.Errorf(
"%w: %s",
ErrJobsUnretrievable,
err.Error(),
)
}

if len(processing) >= job.ReservedWorkflowConcurrency {
return nil, ErrNoAvailableJobs
}
if len(processing) >= job.ReservedWorkflowConcurrency {
return nil, ErrNoAvailableJobs
}

if c.requestFundsWorkflow != nil {
return job.New(c.requestFundsWorkflow), nil
}

return nil, ErrNoRemainingJobs
return nil, ErrStalled
}

// createTransaction constructs and signs a transaction with the provided intent.
Expand Down Expand Up @@ -488,10 +488,10 @@ func (c *Coordinator) process( // nolint:gocognit
c.resetVars()
return NoJobsWaitTime, nil
}
if errors.Is(err, ErrNoRemainingJobs) {
color.Cyan("no remaining jobs!")
if errors.Is(err, ErrStalled) {
color.Yellow("processing stalled")

return -1, nil
return -1, ErrStalled
}
if errors.Is(err, ErrReturnFundsComplete) {
color.Cyan("fund return complete!")
Expand Down
129 changes: 129 additions & 0 deletions constructor/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,3 +2335,132 @@ func TestReturnFunds(t *testing.T) {
jobStorage.AssertExpectations(t)
helper.AssertExpectations(t)
}

func TestNoReservedWorkflows(t *testing.T) {
ctx := context.Background()
jobStorage := &mocks.JobStorage{}
helper := &mocks.Helper{}
handler := &mocks.Handler{}
p := defaultParser(t)
workflows := []*job.Workflow{
{
Name: "transfer",
Concurrency: 1,
Scenarios: []*job.Scenario{
{
Name: "transfer",
Actions: []*job.Action{
{
Type: job.SetVariable,
Input: `{"network":"Testnet3", "blockchain":"Bitcoin"}`,
OutputPath: "transfer.network",
},
{
Type: job.SetVariable,
Input: `{"symbol":"tBTC", "decimals":8}`,
OutputPath: "currency",
},
{
Type: job.FindBalance,
Input: `{"minimum_balance":{"value": "100", "currency": {{currency}}}, "create_limit": 100}`, // nolint
OutputPath: "sender",
},
{
Type: job.Math,
Input: `{"operation":"subtraction", "left_value": "0", "right_value":{{sender.balance.value}}}`,
OutputPath: "sender_amount",
},
{
Type: job.FindBalance,
Input: `{"not_account_identifier":[{{sender.account_identifier}}], "minimum_balance":{"value": "0", "currency": {{currency}}}, "create_limit": 100}`, // nolint
OutputPath: "recipient",
},
{
Type: job.Math,
Input: `{"operation":"subtraction", "left_value":{{sender.balance.value}}, "right_value":"10"}`,
OutputPath: "recipient_amount",
},
{
Type: job.SetVariable,
Input: `"1"`,
OutputPath: "transfer.confirmation_depth",
},
{
Type: job.SetVariable,
Input: `{"test": "works"}`,
OutputPath: "transfer.preprocess_metadata",
},
{
Type: job.SetVariable,
Input: `[{"operation_identifier":{"index":0},"type":"Vin","status":"","account":{{sender.account_identifier}},"amount":{"value":{{sender_amount}},"currency":{{currency}}}},{"operation_identifier":{"index":1},"type":"Vout","status":"","account":{{recipient.account_identifier}},"amount":{"value":{{recipient_amount}},"currency":{{currency}}}}]`, // nolint
OutputPath: "transfer.operations",
},
},
},
{
Name: "print_transaction",
Actions: []*job.Action{
{
Type: job.PrintMessage,
Input: `{{transfer.transaction}}`,
},
},
},
},
},
}

c, err := New(
jobStorage,
helper,
handler,
p,
workflows,
)
assert.NotNil(t, c)
assert.NoError(t, err)

// Create coordination channels
processCanceled := make(chan struct{})

dir, err := utils.CreateTempDir()
assert.NoError(t, err)

db, err := storage.NewBadgerStorage(
ctx,
dir,
storage.WithIndexCacheSize(storage.TinyIndexCacheSize),
)
assert.NoError(t, err)
assert.NotNil(t, db)

helper.On("HeadBlockExists", ctx).Return(true).Once()

// Attempt to transfer
// We use a "read" database transaction in this test because we mock
// all responses from the database and "write" transactions require a
// lock. While it would be possible to orchestrate these locks in this
// test, it is simpler to just use a "read" transaction.
dbTxFail := db.NewDatabaseTransaction(ctx, false)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once()
jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTxFail, "transfer").Return([]*job.Job{}, nil).Once()
helper.On("AllAccounts", ctx, dbTxFail).Return([]*types.AccountIdentifier{}, nil).Once()

// Start processor
go func() {
err := c.Process(ctx)
assert.True(t, errors.Is(err, ErrStalled))
close(processCanceled)
}()

helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx2 := db.NewDatabaseTransaction(ctx, false)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once()

<-processCanceled
jobStorage.AssertExpectations(t)
helper.AssertExpectations(t)
}
7 changes: 4 additions & 3 deletions constructor/coordinator/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ var (
// is <= 0.
ErrInvalidConcurrency = errors.New("invalid concurrency")

// ErrNoRemainingJobs is returned when the caller does not define
// a RequestFunds workflow and we run out of funds.
ErrNoRemainingJobs = errors.New("no remaining jobs")
// ErrStalled is returned when the caller does not define
// a CreateAccount and/or RequestFunds workflow and we run out
// of available options (i.e. we can't do anything).
ErrStalled = errors.New("processing stalled")
)

0 comments on commit c8cb568

Please sign in to comment.