Skip to content

Commit

Permalink
feat(pubsublite): Retryable stream wrapper (#3068)
Browse files Browse the repository at this point in the history
This will be used to manage all of Pub/Sub Lite's bidi streaming RPCs: publish, subscribe, streaming cursor commit, partition assignment, etc.
  • Loading branch information
tmdiep authored Oct 27, 2020
1 parent c8d8237 commit 97cfd45
Show file tree
Hide file tree
Showing 2 changed files with 438 additions and 0 deletions.
100 changes: 100 additions & 0 deletions pubsublite/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

gax "github.com/googleapis/gax-go/v2"
)

// streamRetryer implements the retry policy for establishing gRPC stream
// connections.
type streamRetryer struct {
bo gax.Backoff
deadline time.Time
}

func newStreamRetryer(timeout time.Duration) *streamRetryer {
return &streamRetryer{
bo: gax.Backoff{
Initial: 10 * time.Millisecond,
Max: 10 * time.Second,
Multiplier: 2,
},
deadline: time.Now().Add(timeout),
}
}

func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
if isRetryableSendError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
if isRetryableRecvError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func isRetryableSendCode(code codes.Code) bool {
switch code {
// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
// smaller set of retryable codes.
case codes.DeadlineExceeded, codes.Unavailable:
return true
default:
return false
}
}

func isRetryableRecvCode(code codes.Code) bool {
switch code {
// Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java
case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown:
return true
default:
return false
}
}

func isRetryableSendError(err error) bool {
return isRetryableStreamError(err, isRetryableSendCode)
}

func isRetryableRecvError(err error) bool {
return isRetryableStreamError(err, isRetryableRecvCode)
}

func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
s, ok := status.FromError(err)
if !ok {
// Includes io.EOF, normal stream close.
// Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go
return true
}
return isEligible(s.Code())
}
Loading

0 comments on commit 97cfd45

Please sign in to comment.