Skip to content

Commit

Permalink
Implement basic retry framework (#11).
Browse files Browse the repository at this point in the history
  • Loading branch information
tintoy committed Nov 28, 2016
1 parent f6eaa18 commit ae1407b
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 0 deletions.
6 changes: 6 additions & 0 deletions retry/action.go
@@ -0,0 +1,6 @@
package retry

// ActionFunc is a function representing a retryable operation that does not directly return any value.
//
// Feel free to publish values from the function to variables in the enclosing scope.
type ActionFunc func(context Context)
74 changes: 74 additions & 0 deletions retry/context.go
@@ -0,0 +1,74 @@
package retry

import (
"fmt"
"log"
)

// Context represents contextual information about the current iteration of a retryable operation.
type Context interface {
// Write a formatted message to the log.
Log(format string, v ...interface{})

// Retry the operation once the current iteration completes.
Retry()

// Mark the current iteration as failed due to the specified error.
Fail(err error)
}

// Create a new runnerContext.
func newRunnerContext(operationDescription string) *runnerContext {
return &runnerContext{
OperationDescription: operationDescription,
IterationCount: 0,
Retry: false,
Err: nil,
}
}

type runnerContext struct {
OperationDescription string
IterationCount int
ShouldRetry bool
Error error
}

var _ Context = &runnerContext{}

// Write a formatted message to the log.
func (context *runnerContext) Log(format string, formatArgs ...interface{}) {
log.Printf(format, formatArgs...)
}

// Retry the operation once the current iteration completes.
func (context *runnerContext) Retry() {
context.ShouldRetry = true
}

// Mark the current iteration as failed due to the specified error.
func (context *runnerContext) Fail(err error) {
context.Error = err

if err != nil {
iterationDescription := ""
if context.iterationCount > 1 {
iterationDescription = fmt.Sprintf(" (retry %d)",
context.iterationCount,
)
}

log.Printf("%s%s failed: %s",
context.operationDescription,
iterationDescription,
err,
)
}
}

// NextIteration resets the context for the next iteration.
func (context *runnerContext) NextIteration() {
context.ShouldRetry = false
context.Err = nil
context.IterationCount++
}
33 changes: 33 additions & 0 deletions retry/errors.go
@@ -0,0 +1,33 @@
package retry

import "fmt"

// IsTimeoutError determines whether the specified error represents an operation timeout.
func IsTimeoutError(err error) bool {
_, ok := err.(*OperationTimeoutError)

return ok
}

// OperationTimeoutError is raised when the timeout for an operation is exceeded.
type OperationTimeoutError struct {
// The operation description.
OperationDescription string

// The operation timeout period.
Timeout time.Duration

// The number of attempts that were made to perform the operation.
Attempts int
}

// Error creates a string representation of the OperationTimeoutError.
func (timeoutError *OperationTimeoutError) Error() string {
return fmt.Sprintf("%s - operation timed out after %d seconds (%d attempts)",
timeoutError.OperationDescription,
timeoutError.Timeout/time.Second,
timeoutError.Attempts,
)
}

var _ error = &OperationTimeoutError{}
17 changes: 17 additions & 0 deletions retry/retry.go
@@ -0,0 +1,17 @@
package retry

import "time"

// Default is the default Runner for retries.
var Default = NewRunner(30 * time.Second)

// DoAction performs the specified action until it succeeds or times out.
//
// description is a short description of the function used for logging.
// timeout is the period of time before the process
// action is the action function to invoke
//
// Returns the error (if any) passed to Context.Fail or caused by the operation timing out.
func DoAction(description string, timeout time.Duration, action ActionFunc) error {
return Default.DoAction(description, timeout, action)
}
143 changes: 143 additions & 0 deletions retry/runner.go
@@ -0,0 +1,143 @@
package retry

import (
"log"
"sync"
"time"
)

// Runner is used to execute retriable operations.
type Runner interface {
// GetRetryPeriod retrieves the runner's currently-configured retry period.
//
// This determines how often the Runner will retry operations.
GetRetryPeriod() time.Duration

// SetRetryPeriod configures the runner's retry period.
//
// This determines how long the Runner will wait between retries operations.
SetRetryPeriod(retryPeriod time.Duration)

// DoAction performs the specified action until it succeeds or times out.
//
// description is a short description of the function used for logging.
// timeout is the period of time before the process
// action is the action function to invoke
//
// Returns the error (if any) passed to Context.Fail or caused by the operation timing out.
DoAction(description string, timeout time.Duration, action ActionFunc) error
}

// NewRunner creates a new Runner.
func NewRunner(retryPeriod time.Duration) Runner {
return &retryRunner{
stateLock: &sync.Mutex{},
retryPeriod: retryPeriod,
}
}

type retryRunner struct {
stateLock *sync.Mutex
retryPeriod time.Duration
}

var _ Runner = &retryRunner{}

// GetRetryPeriod retrieves the runner's currently-configured retry period.
//
// This determines how often the Runner will retry operations.
func (runner *retryRunner) GetRetryPeriod() time.Duration {
runner.stateLock.Lock()
defer runner.stateLock.Unlock()

return runner.retryPeriod
}

// SetRetryPeriod configures the runner's retry period.
//
// This determines how long the Runner will wait between retries operations.
func (runner *retryRunner) SetRetryPeriod(retryPeriod time.Duration) {
runner.stateLock.Lock()
defer runner.stateLock.Unlock()

runner.retryPeriod = retryPeriod
}

// DoAction performs the specified action until it succeeds or times out.
//
// description is a short description of the function used for logging.
// timeout is the period of time before the process
// action is the action function to invoke
//
// Returns the error (if any) passed to Context.Fail or caused by the operation timing out.
func (runner *retryRunner) DoAction(description string, timeout time.Duration, action ActionFunc) error {
// Capture current configuration
runner.stateLock.Lock()
retryPeriod := runner.retryPeriod
runner.stateLock.Unlock()

waitTimeout := time.NewTimer(timeout)
defer waitTimeout.Stop()

retryTicker := time.NewTicker(retryPeriod)
defer retryTicker.Stop()

log.Printf("%s - will attempt operation once every %d seconds until successful (timeout after %d seconds)...",
description,
runner.retryPeriod/time.Second,
timeout/time.Second,
)

context := newRunnerContext(description)
for {
select {
case <-waitTimeout.C:
log.Printf("%s - operation timed out after %d seconds (%d attempts)",
description,
timeout/time.Second,
context.iterationCount,
)

return &OperationTimeoutError{
OperationDescription: description,
Timeout: timeout,
Attempts: context.iterationCount,
}

case <-retryTicker.C:
context.NextIteration()

log.Printf("%s - performing attempt %d...",
description,
context.IterationCount,
)

action(context)
if context.Err != nil {
log.Printf("%s - attempt %d failed: %s.",
description,
context.iterationCount,
context.Error,
)

return context.Error
}

if context.ShouldRetry {
log.Printf("%s - attempt %d marked for retry (will try again)...",
description,
context.iterationCount,
)

continue
}

log.Printf("%s - operation sucessful after %d attempt(s).",
description,
context.iterationCount,
)

return nil
}
}
}

0 comments on commit ae1407b

Please sign in to comment.