Skip to content

Commit

Permalink
feat(docker-fetcher): multipart layer fetch
Browse files Browse the repository at this point in the history
fetch big layers of images using more than one connection

Signed-off-by: Adrien Delorme <azr@users.noreply.github.com>
  • Loading branch information
azr committed May 21, 2024
1 parent 7cd7a5c commit 504bd15
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 24 deletions.
19 changes: 19 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
sandboxproxy "github.com/containerd/containerd/v2/core/sandbox/proxy"
"github.com/containerd/containerd/v2/core/snapshots"
snproxy "github.com/containerd/containerd/v2/core/snapshots/proxy"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/containerd/v2/pkg/namespaces"
Expand Down Expand Up @@ -369,6 +370,14 @@ type RemoteContext struct {
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
MaxConcurrentDownloads int

// MaxConcurrentFetchPerDownload is the max number of connection we can have
// per Download. Anything lower than 1 means 1.
MaxConcurrentFetchPerDownload int

// ConcurrentFetchChunkSizes is the size of chunks used when
// max_concurrent_fetch_per_download > 1
ConcurrentFetchChunksSizeMB int

// MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push.
MaxConcurrentUploadedLayers int

Expand All @@ -380,6 +389,16 @@ type RemoteContext struct {
ChildLabelMap func(ocispec.Descriptor) []string
}

func (rCtx *RemoteContext) fetcherOptions() (opts []transfer.FetcherOpt) {
if rCtx.MaxConcurrentFetchPerDownload > 0 && rCtx.ConcurrentFetchChunksSizeMB > 0 {
opts = append(opts,
transfer.WithMaxConcurrentFetchPerDownload(rCtx.MaxConcurrentFetchPerDownload),
transfer.WithConcurrentFetchChunksSizeMB(rCtx.ConcurrentFetchChunksSizeMB),
)
}
return
}

func defaultRemoteContext() *RemoteContext {
return &RemoteContext{
Resolver: docker.NewResolver(docker.ResolverOptions{}),
Expand Down
16 changes: 16 additions & 0 deletions client/client_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,22 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt {
}
}

// WithMaxConcurrentFetchPerDownload sets max concurrent download limit.
func WithMaxConcurrentFetchPerDownload(max int) RemoteOpt {
return func(_ *Client, c *RemoteContext) error {
c.MaxConcurrentFetchPerDownload = max
return nil
}
}

// WithConcurrentFetchChunksSizeMB sets max concurrent download limit.
func WithConcurrentFetchChunksSizeMB(max int) RemoteOpt {
return func(_ *Client, c *RemoteContext) error {
c.ConcurrentFetchChunksSizeMB = max
return nil
}
}

// WithMaxConcurrentUploadedLayers sets max concurrent uploaded layer limit.
func WithMaxConcurrentUploadedLayers(max int) RemoteOpt {
return func(client *Client, c *RemoteContext) error {
Expand Down
2 changes: 1 addition & 1 deletion client/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
return images.Image{}, fmt.Errorf("failed to resolve reference %q: %w", ref, err)
}

fetcher, err := rCtx.Resolver.Fetcher(ctx, name)
fetcher, err := rCtx.Resolver.Fetcher(ctx, name, rCtx.fetcherOptions()...)
if err != nil {
return images.Image{}, fmt.Errorf("failed to get fetcher for %q: %w", name, err)
}
Expand Down
98 changes: 94 additions & 4 deletions core/remotes/docker/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/klauspost/compress/zstd"
Expand All @@ -39,6 +41,7 @@ import (

type dockerFetcher struct {
*dockerBase
config transfer.FetcherConfig
}

func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
Expand Down Expand Up @@ -262,7 +265,21 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest, op
return seeker, desc, nil
}

const (
_ = iota
kB int64 = 1 << (10 * iota)
mB
gB
tB
pB
eB
)

func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) {
parallelism := int64(r.config.MaxConcurrentFetchPerDownload)
chunkSize := int64(r.config.ConcurrentFetchChunksSizeMB)
chunkSize = chunkSize * mB

if mediatype == "" {
req.header.Set("Accept", "*/*")
} else {
Expand All @@ -274,7 +291,7 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
// Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints
// will return the header without supporting the range. The content
// range must always be checked.
req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
req.header.Set("range", fmt.Sprintf("bytes=%d-", offset))
}

resp, err := req.doWithRetries(ctx, nil)
Expand Down Expand Up @@ -302,12 +319,11 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
}
return nil, fmt.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error())
}
cr := resp.Header.Get("content-range")
if offset > 0 {
cr := resp.Header.Get("content-range")
if cr != "" {
if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) {
return nil, fmt.Errorf("unhandled content range in response: %v", cr)

}
} else {
// TODO: Should any cases where use of content range
Expand All @@ -324,13 +340,87 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
return nil, errors.New("unable to discard to offset")
}

// content range ignored, we can't do concurrent fetches here.
parallelism = 0
}
}

body := resp.Body
encoding := strings.FieldsFunc(resp.Header.Get("Content-Encoding"), func(r rune) bool {
encoding := strings.FieldsFunc(resp.Header.Get("content-encoding"), func(r rune) bool {
return r == ' ' || r == '\t' || r == ','
})

minimumSizeForParallelDL := 1 * mB
if chunkSize > minimumSizeForParallelDL {
minimumSizeForParallelDL = chunkSize
}
totalSize, _ := strconv.ParseInt(resp.Header.Get("content-length"), 10, 0)
remaining := totalSize - offset
if parallelism > 1 && remaining > minimumSizeForParallelDL && req.body == nil {
// If we have a content length, we can use multiple requests to fetch
// the content in parallel. This will make download of bigger bodies
// faster, at the cost of parallelism more requests and max
// ~(max_parallelism * goroutine footprint) memory usage. The goroutine
// footprint should be: the goroutine stack + 32 * 1024 bytes (of
// buffer).
numChunks := remaining / chunkSize
if numChunks*chunkSize < remaining {
numChunks++
}
queue := make(chan int64, parallelism)
stopChan := make(chan struct{})
readers, writers := make([]io.Reader, numChunks), make([]*io.PipeWriter, numChunks)
for i := int64(0); i < numChunks; i++ {
readers[i], writers[i] = io.Pipe()
}
go func() {
for i := int64(0); i < numChunks; i++ {
queue <- i
}
close(queue)
}()
for p := int64(0); p < parallelism; p++ {
// start parallel download workers
go func() {
for i := range queue { // first in first out
select {
case <-stopChan:
return
case <-ctx.Done():
return
default:
}
if i == 0 {
_, err = io.Copy(writers[i], io.LimitReader(resp.Body, chunkSize))
_ = resp.Body.Close()
_ = writers[i].CloseWithError(err)
continue
}
reqClone := req.clone()
reqClone.header.Set("range", fmt.Sprintf("bytes=%d-", offset+i*chunkSize))
nresp, err := reqClone.doWithRetries(ctx, nil)
if nresp.StatusCode > 299 {
err = fmt.Errorf("unexpected status code %v: %s", reqClone.String(), nresp.Status)
}
if err != nil {
_ = writers[i].CloseWithError(err)
select {
case <-stopChan:
return
default:
close(stopChan)
}
return
}

_, err = io.Copy(writers[i], io.LimitReader(nresp.Body, chunkSize))
_ = nresp.Body.Close()
_ = writers[i].CloseWithError(err)
}
}()
}
body = io.NopCloser(io.MultiReader(readers...))
}
for i := len(encoding) - 1; i >= 0; i-- {
algorithm := strings.ToLower(encoding[i])
switch algorithm {
Expand Down
11 changes: 8 additions & 3 deletions core/remotes/docker/fetcher_fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/http/httptest"
"net/url"
"strconv"

"github.com/containerd/containerd/v2/core/transfer"
)

func FuzzFetcher(data []byte) int {
Expand All @@ -46,9 +48,12 @@ func FuzzFetcher(data []byte) int {
return 0
}

f := dockerFetcher{&dockerBase{
repository: "nonempty",
}}
f := dockerFetcher{
&dockerBase{
repository: "nonempty",
},
transfer.FetcherConfig{},
}
host := RegistryHost{
Client: s.Client(),
Host: u.Host,
Expand Down
Loading

0 comments on commit 504bd15

Please sign in to comment.