Skip to content

Commit

Permalink
pubsub: retry stream errors properly
Browse files Browse the repository at this point in the history
- Recognize which errors are retryable
- Do exponential backoff
- Stop only when the context is done

Change-Id: I4606c534e167bd7ee138fa588e2dca04a5f1f234
Reviewed-on: https://code-review.googlesource.com/14152
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Kamal Aboul-Hosn <aboulhosn@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
  • Loading branch information
jba committed Jun 26, 2017
1 parent ee3ec37 commit cb38488
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions pubsub/service.go
Expand Up @@ -16,12 +16,13 @@ package pubsub

import (
"fmt"
"io"
"math"
"strings"
"sync"
"time"

"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal/version"
Expand All @@ -33,6 +34,7 @@ import (
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type nextStringFunc func() (string, error)
Expand Down Expand Up @@ -406,9 +408,14 @@ func (p *streamingPuller) call(f func(pb.Subscriber_StreamingPullClient) error)
for p.inFlight {
p.c.Wait()
}
// TODO(jba): better retry strategy.
var err error
for i := 0; i < 3; i++ {
var bo gax.Backoff
for {
select {
case <-p.ctx.Done():
p.err = p.ctx.Err()
default:
}
if p.err != nil {
return p.err
}
Expand All @@ -420,19 +427,33 @@ func (p *streamingPuller) call(f func(pb.Subscriber_StreamingPullClient) error)
p.c.L.Unlock()
err = f(spc)
p.c.L.Lock()
if !p.closed && (err == io.EOF || grpc.Code(err) == codes.Unavailable) {
time.Sleep(500 * time.Millisecond)
if !p.closed && err != nil && isRetryable(err) {
// Sleep with exponential backoff. Normally we wouldn't hold the lock while sleeping,
// but here it can't do any harm, since the stream is broken anyway.
gax.Sleep(p.ctx, bo.Pause())
p.openLocked()
continue
}
// Not a retry-able error; fail permanently.
// TODO(jba): for some errors, should we retry f (the Send or Recv)
// but not re-open the stream?
// Not an error, or not a retryable error; stop retrying.
p.err = err
return err
}
p.err = fmt.Errorf("retry exceeded; last error was %v", err)
return p.err
}

// Logic from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java.
func isRetryable(err error) bool {
s, ok := status.FromError(err)
if !ok { // includes io.EOF, normal stream close, which causes us to reopen
return true
}
switch s.Code() {
case codes.DeadlineExceeded, codes.Internal, codes.Canceled, codes.ResourceExhausted:
return true
case codes.Unavailable:
return !strings.Contains(s.Message(), "Server shutdownNow invoked")
default:
return false
}
}

func (p *streamingPuller) fetchMessages() ([]*Message, error) {
Expand Down

0 comments on commit cb38488

Please sign in to comment.