Skip to content

Commit

Permalink
Make retrying transport and http errors configurable (#1122)
Browse files Browse the repository at this point in the history
* Only wrap transport if it is a transport.Transport

Note: This is a breaking change

Authored-by: Dennis Leon <leonde@vmware.com>

* Provide additional information in transport.Error

- Useful by consumers providing their own Predicate to determine whether to retry or not

Authored-by: Dennis Leon <leonde@vmware.com>

* Add options to configure predicate/backoff when handling higher level http retries

Authored-by: Dennis Leon <leonde@vmware.com>

* backfill test

- add test to assert behavior around using a transport.Wrapper results
in no additional wrapping such as retry is done.

refactoring

- add comments
- rename transport.Transport -> transport.Wrapper
- make transport package return transport.Wrapper

Authored-by: Dennis Leon <leonde@vmware.com>

* Stop exposing Inner from transport.Wrapper

- Consumers should construct a transport.Wrapper via constructor
transport.NewWithContext
- options retryBackoff and retryPredicate should only apply to http
errors and not lower level transport errors. (Consumers can still provide
a transport with the retry behavior they want)

Authored-by: Dennis Leon <leonde@vmware.com>
  • Loading branch information
DennisDenuto committed Sep 22, 2021
1 parent c71ca9b commit 34b7f00
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 77 deletions.
26 changes: 15 additions & 11 deletions pkg/v1/google/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,23 @@ func newLister(repo name.Repository, options ...Option) (*lister, error) {
}
}

// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
l.transport = transport.NewLogger(l.transport)
}
// transport.Wrapper is a signal that consumers are opt-ing into providing their own transport without any additional wrapping.
// This is to allow consumers full control over the transports logic, such as providing retry logic.
if _, ok := l.transport.(*transport.Wrapper); !ok {
// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
l.transport = transport.NewLogger(l.transport)
}

// Wrap the transport in something that can retry network flakes.
l.transport = transport.NewRetry(l.transport)
// Wrap the transport in something that can retry network flakes.
l.transport = transport.NewRetry(l.transport)

// Wrap this last to prevent transport.New from double-wrapping.
if l.userAgent != "" {
l.transport = transport.NewUserAgent(l.transport, l.userAgent)
// Wrap this last to prevent transport.New from double-wrapping.
if l.userAgent != "" {
l.transport = transport.NewUserAgent(l.transport, l.userAgent)
}
}

scopes := []string{repo.Scope(transport.PullScope)}
Expand Down
2 changes: 2 additions & 0 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
context: o.context,
updates: o.updates,
lastUpdate: &v1.Update{},
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}

// Collect the total size of blobs and manifests we're about to write.
Expand Down
89 changes: 89 additions & 0 deletions pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package remote

import (
"context"
"io"
"io/ioutil"
"log"
"net/http"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/google/go-containerregistry/pkg/v1/empty"
"github.com/google/go-containerregistry/pkg/v1/mutate"
"github.com/google/go-containerregistry/pkg/v1/random"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/google/go-containerregistry/pkg/v1/validate"
)
Expand Down Expand Up @@ -220,6 +223,78 @@ func TestMultiWrite_Retry(t *testing.T) {
}

})

t.Run("do not retry transport errors if transport.Wrapper is used", func(t *testing.T) {
// reference a http server that is not listening (used to pick a port that isn't listening)
onlyHandlesPing := http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
if strings.HasSuffix(request.URL.Path, "/v2/") {
responseWriter.WriteHeader(200)
return
}
})
s := httptest.NewServer(onlyHandlesPing)
defer s.Close()

u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}

tag1 := mustNewTag(t, u.Host+"/repo:tag1")

// using a transport.Wrapper, meaning retry logic should not be wrapped
doesNotRetryTransport := &countTransport{inner: http.DefaultTransport}
transportWrapper, err := transport.NewWithContext(context.Background(), tag1.Repository.Registry, nil, doesNotRetryTransport, nil)
if err != nil {
t.Fatal(err)
}

if err := MultiWrite(map[name.Reference]Taggable{
tag1: img1,
}, WithTransport(transportWrapper), WithJobs(1)); err == nil {
t.Errorf("Expected an error, got nil")
}

// expect count == 1 since jobs is set to 1 and we should not retry on transport eof error
if doesNotRetryTransport.count != 1 {
t.Errorf("Incorrect count, got %d, want %d", doesNotRetryTransport.count, 1)
}
})

t.Run("do not add UserAgent if transport.Wrapper is used", func(t *testing.T) {
expectedNotUsedUserAgent := "TEST_USER_AGENT"

handler := registry.New()

registryThatAssertsUserAgentIsCorrect := http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
if strings.Contains(request.Header.Get("User-Agent"), expectedNotUsedUserAgent) {
t.Fatalf("Should not contain User-Agent: %s, Got: %s", expectedNotUsedUserAgent, request.Header.Get("User-Agent"))
}

handler.ServeHTTP(responseWriter, request)
})

s := httptest.NewServer(registryThatAssertsUserAgentIsCorrect)

defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}

tag1 := mustNewTag(t, u.Host+"/repo:tag1")
// using a transport.Wrapper, meaning retry logic should not be wrapped
transportWrapper, err := transport.NewWithContext(context.Background(), tag1.Repository.Registry, nil, http.DefaultTransport, nil)
if err != nil {
t.Fatal(err)
}

if err := MultiWrite(map[name.Reference]Taggable{
tag1: img1,
}, WithTransport(transportWrapper), WithUserAgent(expectedNotUsedUserAgent)); err != nil {
t.Fatal(err)
}
})
}

// TestMultiWrite_Deep tests that a deeply nested tree of manifest lists gets
Expand Down Expand Up @@ -259,3 +334,17 @@ func TestMultiWrite_Deep(t *testing.T) {
t.Error("Validate() =", err)
}
}

type countTransport struct {
count int
inner http.RoundTripper
}

func (t *countTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if strings.HasSuffix(req.URL.Path, "/v2/") {
return t.inner.RoundTrip(req)
}

t.count++
return nil, io.ErrUnexpectedEOF
}
85 changes: 68 additions & 17 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ package remote
import (
"context"
"errors"
"io"
"net/http"
"syscall"
"time"

"github.com/google/go-containerregistry/internal/retry"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"
Expand All @@ -39,13 +43,36 @@ type options struct {
allowNondistributableArtifacts bool
updates chan<- v1.Update
pageSize int
retryBackoff Backoff
retryPredicate retry.Predicate
}

var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}

// Backoff is an alias of retry.Backoff to expose this configuration option to consumers of this lib
type Backoff = retry.Backoff

var defaultRetryPredicate retry.Predicate = func(err error) bool {
// Various failure modes here, as we're often reading from and writing to
// the network.
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
logs.Warn.Printf("retrying %v", err)
return true
}
return false
}

// Try this three times, waiting 1s after first failure, 3s after second.
var defaultRetryBackoff = Backoff{
Duration: 1.0 * time.Second,
Factor: 3.0,
Jitter: 0.1,
Steps: 3,
}

const (
defaultJobs = 4

Expand All @@ -56,12 +83,14 @@ const (

func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o := &options{
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
pageSize: defaultPageSize,
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
pageSize: defaultPageSize,
retryPredicate: defaultRetryPredicate,
retryBackoff: defaultRetryBackoff,
}

for _, option := range opts {
Expand All @@ -78,26 +107,32 @@ func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o.auth = auth
}

// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
o.transport = transport.NewLogger(o.transport)
}
// transport.Wrapper is a signal that consumers are opt-ing into providing their own transport without any additional wrapping.
// This is to allow consumers full control over the transports logic, such as providing retry logic.
if _, ok := o.transport.(*transport.Wrapper); !ok {
// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
o.transport = transport.NewLogger(o.transport)
}

// Wrap the transport in something that can retry network flakes.
o.transport = transport.NewRetry(o.transport)
// Wrap the transport in something that can retry network flakes.
o.transport = transport.NewRetry(o.transport)

// Wrap this last to prevent transport.New from double-wrapping.
if o.userAgent != "" {
o.transport = transport.NewUserAgent(o.transport, o.userAgent)
// Wrap this last to prevent transport.New from double-wrapping.
if o.userAgent != "" {
o.transport = transport.NewUserAgent(o.transport, o.userAgent)
}
}

return o, nil
}

// WithTransport is a functional option for overriding the default transport
// for remote operations.
// If transport.Wrapper is provided, this signals that the consumer does *not* want any further wrapping to occur.
// i.e. logging, retry and useragent
//
// The default transport its http.DefaultTransport.
func WithTransport(t http.RoundTripper) Option {
Expand Down Expand Up @@ -212,3 +247,19 @@ func WithPageSize(size int) Option {
return nil
}
}

// WithRetryBackoff sets the httpBackoff for retry HTTP operations.
func WithRetryBackoff(backoff Backoff) Option {
return func(o *options) error {
o.retryBackoff = backoff
return nil
}
}

// WithRetryPredicate sets the predicate for retry HTTP operations.
func WithRetryPredicate(predicate retry.Predicate) Option {
return func(o *options) error {
o.retryPredicate = predicate
return nil
}
}
12 changes: 6 additions & 6 deletions pkg/v1/remote/transport/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type Error struct {
Errors []Diagnostic `json:"errors,omitempty"`
// The http status code returned.
StatusCode int
// The request that failed.
Request *http.Request
// The raw body if we couldn't understand it.
rawBody string
// The request that failed.
request *http.Request
}

// Check that Error implements error
Expand All @@ -58,8 +58,8 @@ var _ error = (*Error)(nil)
// Error implements error
func (e *Error) Error() string {
prefix := ""
if e.request != nil {
prefix = fmt.Sprintf("%s %s: ", e.request.Method, redactURL(e.request.URL))
if e.Request != nil {
prefix = fmt.Sprintf("%s %s: ", e.Request.Method, redactURL(e.Request.URL))
}
return prefix + e.responseErr()
}
Expand All @@ -68,7 +68,7 @@ func (e *Error) responseErr() string {
switch len(e.Errors) {
case 0:
if len(e.rawBody) == 0 {
if e.request != nil && e.request.Method == http.MethodHead {
if e.Request != nil && e.Request.Method == http.MethodHead {
return fmt.Sprintf("unexpected status code %d %s (HEAD responses have no body, use GET for details)", e.StatusCode, http.StatusText(e.StatusCode))
}
return fmt.Sprintf("unexpected status code %d %s", e.StatusCode, http.StatusText(e.StatusCode))
Expand Down Expand Up @@ -194,7 +194,7 @@ func CheckError(resp *http.Response, codes ...int) error {

structuredError.rawBody = string(b)
structuredError.StatusCode = resp.StatusCode
structuredError.request = resp.Request
structuredError.Request = resp.Request

return structuredError
}
5 changes: 4 additions & 1 deletion pkg/v1/remote/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ type options struct {
predicate retry.Predicate
}

// Backoff is an alias of retry.Backoff to expose this configuration option to consumers of this lib
type Backoff = retry.Backoff

// WithRetryBackoff sets the backoff for retry operations.
func WithRetryBackoff(backoff retry.Backoff) Option {
func WithRetryBackoff(backoff Backoff) Option {
return func(o *options) {
o.backoff = backoff
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/v1/remote/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func NewWithContext(ctx context.Context, reg name.Registry, auth authn.Authentic

switch pr.challenge.Canonical() {
case anonymous:
return t, nil
return &Wrapper{t}, nil
case basic:
return &basicTransport{inner: t, auth: auth, target: reg.RegistryStr()}, nil
return &Wrapper{&basicTransport{inner: t, auth: auth, target: reg.RegistryStr()}}, nil
case bearer:
// We require the realm, which tells us where to send our Basic auth to turn it into Bearer auth.
realm, ok := pr.parameters["realm"]
Expand All @@ -96,8 +96,19 @@ func NewWithContext(ctx context.Context, reg name.Registry, auth authn.Authentic
if err := bt.refresh(ctx); err != nil {
return nil, err
}
return bt, nil
return &Wrapper{bt}, nil
default:
return nil, fmt.Errorf("unrecognized challenge: %s", pr.challenge)
}
}

// Wrapper results in *not* wrapping supplied transport with additional logic such as retries, useragent and debug logging
// Consumers are opt-ing into providing their own transport without any additional wrapping.
type Wrapper struct {
inner http.RoundTripper
}

// RoundTrip delegates to the inner RoundTripper
func (w *Wrapper) RoundTrip(in *http.Request) (*http.Response, error) {
return w.inner.RoundTrip(in)
}

0 comments on commit 34b7f00

Please sign in to comment.