Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (c *Client) Stat(ctx context.Context, key Key, opts ...RequestOption) (http
// Create stores a new object in the cache server. The returned CacheWriter
// must be closed to commit the upload. Call Abort instead of Close to discard
// the in-progress write and ensure the object is never made visible.
func (c *Client) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (CacheWriter, error) {
func (c *Client) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration, opts ...RequestOption) (CacheWriter, error) {
ctx, cancel := context.WithCancelCause(ctx)
pr, pw := io.Pipe()

Expand All @@ -244,6 +244,16 @@ func (c *Client) Create(ctx context.Context, key Key, headers http.Header, ttl t
}

maps.Copy(req.Header, headers)
req.Header.Del(ETagKey)
ro := NewRequestOptions(opts...)
if ro.ETagSet {
etag, err := FormatETag(ro.ETag)
if err != nil {
cancel(err)
return nil, errors.Join(errors.WithStack(err), pr.Close(), pw.Close())
}
req.Header.Set(ETagKey, etag)
}

if ttl > 0 {
req.Header.Set("Time-To-Live", ttl.String())
Expand Down
36 changes: 35 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func (fs *fakeServer) put(w http.ResponseWriter, r *http.Request) {
}
headers[k] = v
}
headers.Set("ETag", fakeETag(body))
if headers.Get("ETag") == "" {
headers.Set("ETag", fakeETag(body))
}
fs.mu.Lock()
fs.objects[fs.key(r)] = fakeObject{body: body, headers: headers}
fs.mu.Unlock()
Expand Down Expand Up @@ -204,6 +206,38 @@ func TestObjectRoundTrip(t *testing.T) {
assert.True(t, isNotExist(err))
}

func TestCreateWithETag(t *testing.T) {
srv := newFakeServer(nil)
defer srv.Close()

c := client.New(srv.URL, nil).Namespace("test")
defer c.Close()

ctx := t.Context()
key := client.NewKey("explicit-etag")

wc, err := c.Create(ctx, key, nil, 0, client.WithETag("caller-etag"))
assert.NoError(t, err)
_, err = wc.Write([]byte("hello"))
assert.NoError(t, err)
assert.NoError(t, wc.Close())

headers, err := c.Stat(ctx, key)
assert.NoError(t, err)
assert.Equal(t, `"caller-etag"`, headers.Get("ETag"))
}

func TestCreateWithInvalidETag(t *testing.T) {
srv := newFakeServer(nil)
defer srv.Close()

c := client.New(srv.URL, nil).Namespace("test")
defer c.Close()

_, err := c.Create(t.Context(), client.NewKey("invalid-etag"), nil, 0, client.WithETag(`"quoted"`))
assert.Error(t, err)
}

func isNotExist(err error) bool { return err != nil && os.IsNotExist(err) }

func TestHeaderFuncAppliesAuth(t *testing.T) {
Expand Down
41 changes: 4 additions & 37 deletions client/parallel_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type RangeReader interface {
// (412) and ParallelGet returns an error rather than splicing bytes from two
// revisions; a server that ignores If-Match is caught by verifying each chunk's
// response ETag. A missing or truncated chunk is likewise reported as an error,
// so a partially written dst must be discarded by the caller on failure. An object with no ETag to pin to (e.g. one stored
// before ETags were recorded) cannot be kept revision-safe across chunks, so it
// falls back to a single full read instead of parallelising. A concurrency of
// so a partially written dst must be discarded by the caller on failure. An
// object with no ETag to pin to cannot be kept revision-safe across chunks, so
// it falls back to a single full read instead of parallelising. A concurrency of
// 1 likewise reads the whole object in one request, since chunking a single
// worker would only serialise ranged GETs for no benefit.
//
Expand All @@ -46,18 +46,13 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
}
concurrency = max(concurrency, 1)

// A single worker gains nothing from chunking — it would only serialise
// ranged GETs — so skip discovery entirely and read the object in one
// revision-consistent request.
if concurrency == 1 {
return fullRead(ctx, c, key, dst)
}

// Discovery: the first ranged Open delivers chunk zero and reveals the total
// size and ETag used to pin the rest.
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
if errors.Is(err, ErrRangeNotSatisfiable) {
return nil // Empty object: nothing to write.
return nil
}
if err != nil {
return errors.Wrap(err, "parallel get: open first chunk")
Expand All @@ -66,10 +61,6 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
etag := headers.Get(ETagKey)
total, hasRange := parseContentRangeTotal(headers.Get("Content-Range"))

// A backend that ignored the range (no Content-Range), or an object that
// fits within the first chunk, is delivered entirely by this response: copy
// it and return, as there is nothing to parallelise. A negative want skips
// the length check when the total size is unknown.
firstLen := min(chunkSize, total)
if !hasRange {
firstLen = -1
Expand All @@ -78,27 +69,18 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
return errors.Wrap(writeChunkAt(dst, 0, firstLen, rc), "parallel get")
}

// Subsequent chunks are pinned to the discovery ETag via IfMatch. Without a
// validator there is nothing to pin to (IfMatch("") is a no-op), so chunks
// could be spliced across a rewrite undetected. Objects stored before ETags
// were recorded fall here, so fall back to a single, revision-consistent
// read rather than parallelising.
if etag == "" {
if err := rc.Close(); err != nil {
return errors.Wrap(err, "parallel get: close discovery reader")
}
return fullRead(ctx, c, key, dst)
}

// Multiple chunks: copy the already-open first chunk concurrently with the
// rest rather than blocking on it here. The first goroutine is scheduled
// before the limit can be reached, so it never stalls holding an open body.
numChunks := int((total + chunkSize - 1) / chunkSize)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
eg.Go(func() error { return writeChunkAt(dst, 0, firstLen, rc) })
for seq := 1; seq < numChunks; seq++ {
// Stop scheduling once a chunk has failed and cancelled the group.
if egCtx.Err() != nil {
break
}
Expand All @@ -109,11 +91,6 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
return errors.Wrap(eg.Wait(), "parallel get")
}

// fullRead downloads the entire object in a single request and writes it at
// offset zero. It is used when chunking would add no value (a single worker) or
// cannot be made revision-safe (no ETag to pin). The body is a single
// consistent revision, but its length is unknown up front, so writeChunkAt's
// length check is skipped (-1).
func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error {
rc, _, err := c.Open(ctx, key)
if err != nil {
Expand All @@ -122,11 +99,6 @@ func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) erro
return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get")
}

// fetchChunk opens the [start, end) range pinned to etag via If-Match and
// writes it at start. An ETag change (the object was rewritten mid-download)
// surfaces as ErrPreconditionFailed, or as a response-ETag mismatch when the
// server ignores If-Match; either way it is reported as an error, as is a
// short read.
func fetchChunk(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, start, end int64, etag string) error {
rc, headers, err := c.Open(ctx, key, Range(start, end), IfMatch(etag))
if errors.Is(err, ErrPreconditionFailed) {
Expand All @@ -144,8 +116,6 @@ func fetchChunk(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, st
return writeChunkAt(dst, start, end-start, rc)
}

// writeChunkAt streams src into dst at off and closes src. It fails if fewer
// than want bytes arrive; a negative want skips that check (total size unknown).
func writeChunkAt(dst io.WriterAt, off, want int64, src io.ReadCloser) error {
n, copyErr := io.Copy(io.NewOffsetWriter(dst, off), src)
if err := errors.Join(copyErr, src.Close()); err != nil {
Expand All @@ -157,9 +127,6 @@ func writeChunkAt(dst io.WriterAt, off, want int64, src io.ReadCloser) error {
return nil
}

// parseContentRangeTotal extracts the total size from a Content-Range value of
// the form "bytes start-end/total". It returns ok=false when the header is
// absent or unparseable.
func parseContentRangeTotal(contentRange string) (total int64, ok bool) {
_, size, found := strings.Cut(contentRange, "/")
if !found {
Expand Down
100 changes: 69 additions & 31 deletions client/parallel_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"slices"
"strconv"
"strings"
Expand All @@ -17,12 +18,8 @@ import (
"github.com/block/cachew/client"
)

// Compile-time assertion that the concrete Client satisfies the narrow
// interface ParallelGet drives.
var _ client.RangeReader = (*client.Client)(nil)

// bufferAt is an in-memory io.WriterAt that extends like a file, zero-filling
// any gap, so tests can assert reassembly without touching disk.
type bufferAt struct {
mu sync.Mutex
buf []byte
Expand All @@ -38,10 +35,6 @@ func (b *bufferAt) WriteAt(p []byte, off int64) (int, error) {
return len(p), nil
}

// rangeFlipReader serves correct byte ranges but ignores If-Match and reports
// a different ETag for any chunk past the first, simulating an object
// rewritten mid-download behind a server that does not honour preconditions.
// It exercises the client-side response-ETag guard.
type rangeFlipReader struct {
data []byte
firstETag string
Expand Down Expand Up @@ -77,9 +70,6 @@ func TestParallelGetETagMismatch(t *testing.T) {
assert.Contains(t, err.Error(), "object changed during read")
}

// ifMatchFlipReader honours If-Match like a conforming server whose object is
// rewritten after the discovery request, so every pinned chunk fails its
// precondition with a bodiless rejection.
type ifMatchFlipReader struct {
data []byte
firstETag string
Expand Down Expand Up @@ -118,8 +108,6 @@ func TestParallelGetPreconditionFailedOnRewrite(t *testing.T) {
assert.Contains(t, err.Error(), "object changed during read")
}

// noETagReader serves byte ranges but never sets an ETag, modelling a legacy
// entry or a RangeReader implementation that omits it.
type noETagReader struct {
data []byte
}
Expand All @@ -140,8 +128,6 @@ func (n *noETagReader) Open(_ context.Context, _ client.Key, opts ...client.Requ
}

func TestParallelGetNoETagMultiChunk(t *testing.T) {
// A multi-chunk object with no ETag can't be pinned, so it falls back to a
// single full read (backwards compatible with objects stored before ETags).
data := make([]byte, 1000)
for i := range data {
data[i] = byte(i % 251)
Expand All @@ -154,8 +140,6 @@ func TestParallelGetNoETagMultiChunk(t *testing.T) {
}

func TestParallelGetNoETagSingleChunk(t *testing.T) {
// A no-ETag object delivered entirely by the discovery request is a single
// revision, so it succeeds without pinning.
data := []byte("0123456789")
c := &noETagReader{data: data}
var dst bufferAt
Expand All @@ -164,9 +148,6 @@ func TestParallelGetNoETagSingleChunk(t *testing.T) {
assert.Equal(t, data, dst.buf)
}

// changingSizeReader serves a multi-chunk body with no ETag on the ranged
// discovery request, then a differently sized body on the subsequent full
// (non-range) read, modelling an object rewritten between the two requests.
type changingSizeReader struct {
discovery []byte
rewritten []byte
Expand All @@ -192,16 +173,11 @@ func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...clien
return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil
}

// openRecord captures the fetch-shaping options of a single Open call: the
// Range requested ("" for a full read) and the If-Match validator it was
// pinned to.
type openRecord struct {
Range string
IfMatch string
}

// recordingReader serves byte ranges and records an openRecord for every Open
// call, so tests can assert how the object was fetched.
type recordingReader struct {
data []byte
etag string
Expand Down Expand Up @@ -234,8 +210,6 @@ func (r *recordingReader) Open(_ context.Context, _ client.Key, opts ...client.R
}

func TestParallelGetSingleWorkerFullRead(t *testing.T) {
// A concurrency of 1 gains nothing from chunking, so it must issue a single
// non-ranged read rather than discovering and serialising ranged GETs.
data := make([]byte, 1000)
for i := range data {
data[i] = byte(i % 251)
Expand Down Expand Up @@ -268,13 +242,77 @@ func TestParallelGetPinsChunksWithIfMatch(t *testing.T) {
}

func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) {
// A no-ETag multi-chunk object falls back to a single full read. If it is
// rewritten to a different size between discovery and that read, the
// discovery total must not be used to validate the full body: the full read
// is itself a consistent revision and should be accepted in its entirety.
c := &changingSizeReader{discovery: make([]byte, 1000), rewritten: []byte("changed")}
var dst bufferAt
err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
assert.NoError(t, err)
assert.Equal(t, c.rewritten, dst.buf)
}

func TestParallelGetClient(t *testing.T) {
content := make([]byte, 1000)
for i := range content {
content[i] = byte(i % 251)
}
etag := fakeETag(content)

mux := http.NewServeMux()
mux.HandleFunc("GET /api/v1/object/{namespace}/{key}", func(w http.ResponseWriter, r *http.Request) {
if ifMatch := r.Header.Get("If-Match"); ifMatch != "" && ifMatch != etag {
w.Header().Set(client.ETagKey, etag)
w.WriteHeader(http.StatusPreconditionFailed)
return
}

w.Header().Set(client.ETagKey, etag)
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(content)
return
}

start, end, ok := parseTestByteRange(rangeHeader)
assert.True(t, ok)
if start >= int64(len(content)) {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(content)))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
end = min(end, int64(len(content)-1))
body := content[start : end+1]
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(content)))
w.Header().Set("Content-Length", strconv.Itoa(len(body)))
w.WriteHeader(http.StatusPartialContent)
_, _ = w.Write(body)
})
srv := httptest.NewServer(mux)
defer srv.Close()

c := client.New(srv.URL, nil).Namespace("test")
defer c.Close()
var dst bufferAt
err := client.ParallelGet(context.Background(), c, client.NewKey("parallel-client"), &dst, 100, 4)
assert.NoError(t, err)
assert.Equal(t, content, dst.buf)
}

func parseTestByteRange(header string) (start, end int64, ok bool) {
spec, ok := strings.CutPrefix(header, "bytes=")
if !ok {
return 0, 0, false
}
startSpec, endSpec, ok := strings.Cut(spec, "-")
if !ok || endSpec == "" {
return 0, 0, false
}
start, err := strconv.ParseInt(startSpec, 10, 64)
if err != nil {
return 0, 0, false
}
end, err = strconv.ParseInt(endSpec, 10, 64)
if err != nil {
return 0, 0, false
}
return start, end, true
}
Loading