Skip to content

Commit

Permalink
Merge fbc7b5e into 9e08723
Browse files Browse the repository at this point in the history
  • Loading branch information
XenoPhex committed Feb 9, 2017
2 parents 9e08723 + fbc7b5e commit 0662465
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
18 changes: 17 additions & 1 deletion consumer/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (c *Consumer) SetMaxRetryDelay(d time.Duration) {
atomic.StoreInt64(&c.maxRetryDelay, int64(d))
}

// SetRetryTimeout sets the overall timeout for retry actions.
//
// Defaults to 0/unlimited.
func (c *Consumer) SetRetryTimeout(d time.Duration) {
atomic.StoreInt64(&c.retryTimeout, int64(d))
}

// TailingLogs listens indefinitely for log messages only; other event types
// are dropped.
// Whenever an error is encountered, the error will be sent down the error
Expand Down Expand Up @@ -278,7 +285,14 @@ func (c *Consumer) retryAction(action func() (err error, done bool), errors chan
}
})

for {
var timeout time.Time
if t := atomic.LoadInt64(&c.retryTimeout); t > 0 {
timeout = time.Now().Add(time.Duration(t))
} else {
timeout = time.Unix(1<<63-62135596801, 999999999)
}

for time.Now().Before(timeout) {
err, done := action()
if done {
return
Expand All @@ -305,6 +319,8 @@ func (c *Consumer) retryAction(action func() (err error, done bool), errors chan
atomic.StoreInt64(&context.sleep, max)
}
}

errors <- noaa_errors.NewRetryTimeoutError()
}

func (c *Consumer) isTimeoutErr(err error) bool {
Expand Down
25 changes: 20 additions & 5 deletions consumer/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/cloudfoundry/noaa/consumer"
"github.com/cloudfoundry/noaa/errors"
noaa_errors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/noaa/test_helpers"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -338,17 +338,20 @@ var _ = Describe("Consumer (Asynchronous)", func() {

Describe("TailingLogs", func() {
var (
logMessages <-chan *events.LogMessage
errors <-chan error
retries uint
logMessages <-chan *events.LogMessage
errors <-chan error
retries uint
retryTimeout time.Duration
)

BeforeEach(func() {
retries = 5
retryTimeout = 0
startFakeTrafficController()
})

JustBeforeEach(func() {
cnsmr.SetRetryTimeout(retryTimeout)
logMessages, errors = cnsmr.TailingLogs(appGuid, authToken)
})

Expand Down Expand Up @@ -406,6 +409,18 @@ var _ = Describe("Consumer (Asynchronous)", func() {
})
})

Context("when connection cannot be established before retryTimeout", func() {
BeforeEach(func() {
retryTimeout = time.Nanosecond
})

It("sends a timeout error to the channel and returns", func() {
var err error
Eventually(errors).Should(Receive(&err))
Expect(err).To(MatchError(noaa_errors.RetryTimeoutError{}))
})
})

Context("with a failing handler", func() {
BeforeEach(func() {
fakeHandler.Fail = true
Expand Down Expand Up @@ -935,7 +950,7 @@ var _ = Describe("Consumer (Asynchronous)", func() {
})

func BeRetryable() types.GomegaMatcher {
return BeAssignableToTypeOf(errors.NewRetryError(fmt.Errorf("some-error")))
return BeAssignableToTypeOf(noaa_errors.NewRetryError(fmt.Errorf("some-error")))
}

func createError(message string) *events.Envelope {
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Consumer struct {
// minRetryDelay and maxRetryDelay must be the first words in this struct
// in order to be used atomically by 32-bit systems.
// https://golang.org/src/sync/atomic/doc.go?#L50
minRetryDelay, maxRetryDelay int64
minRetryDelay, maxRetryDelay, retryTimeout int64

trafficControllerUrl string
idleTimeout time.Duration
Expand Down
15 changes: 15 additions & 0 deletions errors/retry_timeout_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package errors

// RetryTimeoutError is encountered when retrying an operation times out.
type RetryTimeoutError struct {
}

// NewRetryTimeoutError constructs a RetryTimeoutError from any error.
func NewRetryTimeoutError() RetryTimeoutError {
return RetryTimeoutError{}
}

// Error implements error.
func (e RetryTimeoutError) Error() string {
return "Retry has timed out. Please ask your Cloud Foundry Operator for support."
}

0 comments on commit 0662465

Please sign in to comment.