Skip to content

Commit

Permalink
all: add per-chunk retry deadline
Browse files Browse the repository at this point in the history
In https://code-review.googlesource.com/c/google-api-go-client/+/43470,
we accidentally removed a piece of logic that gave each chunk* a
deadline. This adds it back.

This CL also adds tests for retry logic, so that we don't miss it next
time.

This CL also removes contextDone(ctx context.Context) bool, since it's
only used in one place, and in that one place we do the exact same logic
right after contextDone is used.

* Note: the single for loop in the aforementioned CL is not a for loop
that only handles retries for "each chunk". That for loop handles both
retries as well as multiple chunks (which could occur by way of
incomplete transfers). When a new chunk was started, the for loop wasn't
quit: instead, the backoff and retryDeadline was re-initialized.

In this CL, that single for loop is split into an outer (per chunk) and
inner (retry for a single chunk) for loop. Subsequently the
backoff/retry reset logic is removed. This makes it more clear what's
happening.

Fixes #389

Change-Id: Ib66905efb5eaca5f9a40dccd343bff7c7d2b5aa3
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/44070
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Tyler Bui-Palsulich <tbp@google.com>
  • Loading branch information
jeanbza committed Aug 19, 2019
1 parent 573115b commit 4fac4de
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 46 deletions.
95 changes: 50 additions & 45 deletions gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ type Backoff interface {
Pause() time.Duration
}

// This is declared as a global variable so that tests can overwrite it.
var backoff = func() Backoff {
return &gax.Backoff{Initial: 100 * time.Millisecond}
}
// These are declared as global variables so that tests can overwrite them.
var (
retryDeadline = 32 * time.Second
backoff = func() Backoff {
return &gax.Backoff{Initial: 100 * time.Millisecond}
}
)

const (
// statusTooManyRequests is returned by the storage API if the
Expand Down Expand Up @@ -148,15 +151,6 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
return res, nil
}

func contextDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

// Upload starts the process of a resumable upload with a cancellable context.
// It retries using the provided back off strategy until cancelled or the
// strategy indicates to stop retrying.
Expand All @@ -166,8 +160,6 @@ func contextDone(ctx context.Context) bool {
// rx is private to the auto-generated API code.
// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
var pause time.Duration

var shouldRetry = func(status int, err error) bool {
if 500 <= status && status <= 599 {
return true
Expand All @@ -184,53 +176,66 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
return false
}

bo := backoff()
// There are a couple of cases where it's possible for err and resp to both
// be non-nil. However, we expose a simpler contract to our callers: exactly
// one of resp and err will be non-nil. This means that any response body
// must be closed here before returning a non-nil error.
var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
if err != nil {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
return nil, err
}
return resp, nil
}

// Send all chunks.
for {
// Ensure that we return in the case of cancelled context, even if pause is 0.
if contextDone(ctx) {
return nil, ctx.Err()
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(pause):
}
var pause time.Duration

// Each chunk gets its own initialized-at-zero retry.
bo := backoff()
quitAfter := time.After(retryDeadline)

// Retry loop for a single chunk.
for {
select {
case <-ctx.Done():
if err == nil {
err = ctx.Err()
}
return prepareReturn(resp, err)
case <-time.After(pause):
case <-quitAfter:
return prepareReturn(resp, err)
}

resp, err = rx.transferChunk(ctx)
resp, err = rx.transferChunk(ctx)

var status int
if resp != nil {
status = resp.StatusCode
}
var status int
if resp != nil {
status = resp.StatusCode
}

// Check if we should retry the request.
if !shouldRetry(status, err) {
break
}

// Check if we should retry the request.
if shouldRetry(status, err) {
pause = bo.Pause()
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
continue
}

// If the chunk was uploaded successfully, but there's still
// more to go, upload the next chunk without any delay.
if statusResumeIncomplete(resp) {
pause = 0
resp.Body.Close()
continue
}

// It's possible for err and resp to both be non-nil here, but we expose a simpler
// contract to our callers: exactly one of resp and err will be non-nil. This means
// that any response body must be closed here before returning a non-nil error.
if err != nil {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
return nil, err
}

return resp, nil
return prepareReturn(resp, err)
}
}
119 changes: 119 additions & 0 deletions gensupport/resumable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"reflect"
"strings"
"testing"
"time"
)

type unexpectedReader struct{}
Expand Down Expand Up @@ -64,6 +65,9 @@ func (tc *trackingCloser) Open() {
}

func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if len(t.events) == 0 {
panic("Ran out of events, but got a request")
}
ev := t.events[0]
t.events = t.events[1:]
if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
Expand Down Expand Up @@ -293,3 +297,118 @@ func TestCancelUpload(t *testing.T) {
t.Errorf("unclosed request bodies: %v", tr.bodies)
}
}

func TestRetry_Bounded(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))

tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
{"bytes 0-89/*", http.StatusServiceUnavailable},
},
bodies: bodyTracker{},
}

rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
}

oldRetryDeadline := retryDeadline
retryDeadline = time.Second
defer func() { retryDeadline = oldRetryDeadline }()

oldBackoff := backoff
backoff = func() Backoff { return new(PauseForeverBackoff) }
defer func() { backoff = oldBackoff }()

resCode := make(chan int)
go func() {
resp, err := rx.Upload(context.Background())
if err != nil {
t.Error(err)
return
}
resCode <- resp.StatusCode
}()

select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for Upload to complete")
case got := <-resCode:
if want, got := http.StatusServiceUnavailable, got; got != want {
t.Fatalf("want %d, got %d", want, got)
}
}
}

func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))

tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 1s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 2s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 3s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 4s sleep
{"bytes 0-89/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 90-179/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 180-269/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 270-299/300", 200},
},
bodies: bodyTracker{},
}

rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
}

oldRetryDeadline := retryDeadline
retryDeadline = 5 * time.Second
defer func() { retryDeadline = oldRetryDeadline }()

oldBackoff := backoff
backoff = func() Backoff { return new(PauseOneSecond) }
defer func() { backoff = oldBackoff }()

resCode := make(chan int, 1)
go func() {
resp, err := rx.Upload(context.Background())
if err != nil {
t.Error(err)
return
}
resCode <- resp.StatusCode
}()

select {
case <-time.After(15 * time.Second):
t.Fatal("timed out waiting for Upload to complete")
case got := <-resCode:
if want := http.StatusOK; got != want {
t.Fatalf("want %d, got %d", want, got)
}
}
}
12 changes: 11 additions & 1 deletion gensupport/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,17 @@ func (er *errReader) Read(p []byte) (int, error) {
return n, nil
}

// NoPauseBackoff implements backoff strategy with infinite 0-length pauses.
// NoPauseBackoff implements backoff with infinite 0-length pauses.
type NoPauseBackoff struct{}

func (bo *NoPauseBackoff) Pause() time.Duration { return 0 }

// PauseOneSecond implements backoff with infinite 1s pauses.
type PauseOneSecond struct{}

func (bo *PauseOneSecond) Pause() time.Duration { return time.Second }

// PauseForeverBackoff implements backoff with infinite 1h pauses.
type PauseForeverBackoff struct{}

func (bo *PauseForeverBackoff) Pause() time.Duration { return time.Hour }

0 comments on commit 4fac4de

Please sign in to comment.