Skip to content

Commit

Permalink
Refactor process code to support return funds
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-ogrady committed Sep 15, 2020
1 parent f245600 commit 4f99c9a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
63 changes: 41 additions & 22 deletions constructor/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ func New(
workflowNames[i] = workflow.Name

if workflow.Name == string(job.CreateAccount) {
if createAccountWorkflow != nil {
return nil, ErrDuplicateWorkflows
}

if workflow.Concurrency != job.ReservedWorkflowConcurrency {
return nil, ErrIncorrectConcurrency
}
Expand All @@ -65,10 +61,6 @@ func New(
}

if workflow.Name == string(job.RequestFunds) {
if requestFundsWorkflow != nil {
return nil, ErrDuplicateWorkflows
}

if workflow.Concurrency != job.ReservedWorkflowConcurrency {
return nil, ErrIncorrectConcurrency
}
Expand All @@ -78,10 +70,6 @@ func New(
}

if workflow.Name == string(job.ReturnFunds) {
if returnFundsWorkflow != nil {
return nil, ErrDuplicateWorkflows
}

returnFundsWorkflow = workflow
continue
}
Expand Down Expand Up @@ -116,6 +104,7 @@ func New(
func (c *Coordinator) findJob(
ctx context.Context,
dbTx storage.DatabaseTransaction,
returnFunds bool,
) (*job.Job, error) {
// Look for any jobs ready for processing. If one is found,
// we return that as the next job to process.
Expand All @@ -136,9 +125,16 @@ func (c *Coordinator) findJob(
}

// Attempt non-reserved workflows
for _, workflow := range c.workflows {
// TODO: only attempt "return_funds" workflow...if doesn't exist,
// return ErrReturnComplete
availableWorkflows := c.workflows
if returnFunds {
if c.returnFundsWorkflow == nil {
return nil, ErrReturnFundsComplete
}

availableWorkflows = append(availableWorkflows, c.returnFundsWorkflow)
}

for _, workflow := range availableWorkflows {
if utils.ContainsString(c.attemptedWorkflows, workflow.Name) {
continue
}
Expand Down Expand Up @@ -173,9 +169,12 @@ func (c *Coordinator) findJob(
return nil, ErrNoAvailableJobs
}

// TODO: if in "return mode" and reach here, we should exit as
// returns are complete.
// TODO: create new error type for this scenario
// If we are returning funds, we should exit here
// because we don't want to create any new accounts
// or request funds while returning funds.
if returnFunds {
return nil, ErrReturnFundsComplete
}

// Check if ErrCreateAccount, then create account if less
// processing CreateAccount jobs than ReservedWorkflowConcurrency.
Expand Down Expand Up @@ -458,10 +457,11 @@ func (c *Coordinator) invokeHandlersAndBroadcast(
return nil
}

// Process creates and executes jobs
// until failure.
func (c *Coordinator) Process( // nolint:gocognit
// process orchestrates the execution of workflows
// and the broadcast of transactions.
func (c *Coordinator) process( // nolint:gocognit
ctx context.Context,
returnFunds bool,
) error {
for ctx.Err() == nil {
if !c.helper.HeadBlockExists(ctx) {
Expand All @@ -481,7 +481,7 @@ func (c *Coordinator) Process( // nolint:gocognit
dbTx := c.helper.DatabaseTransaction(ctx)

// Attempt to find a Job to process.
j, err := c.findJob(ctx, dbTx)
j, err := c.findJob(ctx, dbTx, returnFunds)
if errors.Is(err, ErrNoAvailableJobs) {
log.Println("waiting for available jobs...")

Expand Down Expand Up @@ -590,3 +590,22 @@ func (c *Coordinator) Process( // nolint:gocognit

return ctx.Err()
}

// Process creates and executes jobs
// until failure.
func (c *Coordinator) Process(
ctx context.Context,
) error {
return c.process(ctx, false)
}

// ReturnFunds attempts to execute
// the ReturnFunds workflow until
// it is no longer satisfiable. This
// is typically called on shutdown
// to return funds to a faucet.
func (c *Coordinator) ReturnFunds(
ctx context.Context,
) error {
return c.process(ctx, true)
}
5 changes: 5 additions & 0 deletions constructor/coordinator/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var (
// and retry.
ErrNoAvailableJobs = errors.New("no jobs available")

// ErrReturnFundsComplete is returned when it is not possible
// to process any more ReturnFundsWorkflows or when there is no provided
// ReturnsFundsWorkflow.
ErrReturnFundsComplete = errors.New("return funds complete")

// ErrCreateAccountWorkflowMissing is returned when we want
// to create an account but the create account workflow is missing.
ErrCreateAccountWorkflowMissing = errors.New("create account workflow missing")
Expand Down

0 comments on commit 4f99c9a

Please sign in to comment.