Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions cmd/cache-proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -185,6 +186,22 @@ func (p *CacheProxy) HandleProxy(w http.ResponseWriter, r *http.Request) {

data, contentType, source, err := p.fetchDedup(cacheKey, r, rangeHeader)
if err != nil {
// An origin that responded with a non-2xx (e.g. S3 returning a 400 with
// <Code>ExpiredToken</Code> in an XML envelope) is forwarded back to
// DuckDB verbatim — same status code, same body, same headers minus
// hop-by-hop. This preserves the error class so httpfs can distinguish
// transient (5xx) from terminal (4xx) failures, and gives DuckLake the
// raw S3 error body it knows how to parse.
var oe *originStatusError
if errors.As(err, &oe) {
slog.Warn("Origin returned non-2xx; forwarding verbatim.",
"url", r.URL.String(), "range", rangeHeader, "status", oe.status, "body_preview", previewBody(oe.body))
oe.writeTo(w)
return
}
// True transport-level failure (DNS, connection refused, TLS, timeout):
// no upstream status exists, so 502 Bad Gateway is the right answer
// here — and it's also the one DuckDB's httpfs treats as transient.
slog.Error("Failed to fetch.", "url", r.URL.String(), "range", rangeHeader, "error", err)
http.Error(w, err.Error(), http.StatusBadGateway)
return
Expand Down Expand Up @@ -251,8 +268,15 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) {
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 400 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
return nil, "", fmt.Errorf("origin %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
// Capture the body up to a generous cap. S3 error envelopes are
// typically <1 KiB; the cap is just a guard against a misbehaving
// origin streaming forever. The 60s context above also protects us.
body, _ := io.ReadAll(io.LimitReader(resp.Body, originErrorBodyCap))
return nil, "", &originStatusError{
status: resp.StatusCode,
headers: resp.Header.Clone(),
body: body,
}
}

data, err := io.ReadAll(resp.Body)
Expand All @@ -262,6 +286,53 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) {
return data, resp.Header.Get("Content-Type"), nil
}

// originErrorBodyCap is the maximum number of bytes we'll buffer from a
// non-2xx origin response. S3 XML error envelopes are tiny; this is just a
// safety net.
const originErrorBodyCap = 1 << 20 // 1 MiB

// originStatusError captures a non-2xx response from the origin so the
// proxy can forward it back to the client verbatim. The status code, body,
// and response headers are all preserved (minus hop-by-hop) so DuckDB sees
// exactly what S3 said — including the XML error envelope DuckLake may
// inspect.
type originStatusError struct {
status int
headers http.Header
body []byte
}

func (e *originStatusError) Error() string {
return fmt.Sprintf("origin %d: %s", e.status, strings.TrimSpace(string(e.body)))
}

// writeTo replays the captured origin response onto w. Any header the
// origin set that isn't a hop-by-hop is forwarded; status code and body
// follow.
func (e *originStatusError) writeTo(w http.ResponseWriter) {
for k, vv := range e.headers {
if hopByHop[strings.ToLower(k)] {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(e.status)
_, _ = w.Write(e.body)
}

// previewBody returns up to 256 bytes of the body for log lines so we don't
// spam multi-KiB XML envelopes into structured logs while still keeping the
// useful prefix (S3 puts the <Code>...</Code> first).
func previewBody(body []byte) string {
const max = 256
if len(body) <= max {
return string(body)
}
return string(body[:max]) + "...(truncated)"
}

// serveBody writes cached data back to the client, reconstructing 206 Partial
// Content semantics when the original request had a Range header.
func (p *CacheProxy) serveBody(w http.ResponseWriter, data []byte, rangeHeader, contentType string) {
Expand Down
134 changes: 128 additions & 6 deletions cmd/cache-proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,136 @@ func TestHandleProxyRejectsNonAbsoluteURL(t *testing.T) {
}
}

func TestHandleProxyOriginError(t *testing.T) {
// TestHandleProxyForwardsOrigin5xxVerbatim: any non-2xx the origin returns
// must be passed back to DuckDB unchanged. Translating a 500 into a 502
// (the old behaviour) made DuckDB's httpfs retry transient-class errors that
// were really terminal, and hid the real status from logs and the client.
func TestHandleProxyForwardsOrigin5xxVerbatim(t *testing.T) {
proxy := newTestProxy(t)
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "boom", http.StatusInternalServerError)
})
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/broken", http.Header{"Range": []string{"bytes=0-1"}})
if rec.Code != http.StatusInternalServerError {
t.Fatalf("status = %d, want 500 forwarded verbatim", rec.Code)
}
if !strings.Contains(rec.Body.String(), "boom") {
t.Errorf("body = %q, want it to contain origin body 'boom'", rec.Body.String())
}
}

// TestHandleProxyForwardsOrigin400Verbatim is the case the user actually
// hit: S3 returns 400 with an XML envelope (<Code>ExpiredToken</Code>) and
// DuckDB needs to see *that* body and *that* status, not a generic 502 with
// a Go-formatted error string. Without verbatim passthrough the error class
// (4xx terminal vs 5xx retriable) is lost and httpfs retries non-retriable
// auth failures.
func TestHandleProxyForwardsOrigin400Verbatim(t *testing.T) {
proxy := newTestProxy(t)
const errBody = `<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message></Error>`
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/xml")
w.Header().Set("X-Amz-Request-Id", "TESTREQID123")
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(errBody))
})
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/expired.parquet", http.Header{"Range": []string{"bytes=0-1023"}})

if rec.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want 400 forwarded verbatim", rec.Code)
}
if got := rec.Body.String(); got != errBody {
t.Errorf("body mismatch:\n got = %q\nwant = %q", got, errBody)
}
if ct := rec.Header().Get("Content-Type"); ct != "application/xml" {
t.Errorf("Content-Type = %q, want application/xml so DuckLake parses it as an S3 error envelope", ct)
}
if rid := rec.Header().Get("X-Amz-Request-Id"); rid != "TESTREQID123" {
t.Errorf("X-Amz-Request-Id = %q, want TESTREQID123 (preserved for debugging)", rid)
}
}

// TestHandleProxyForwardsOrigin404Verbatim: a 404 must stay a 404 so
// DuckDB / DuckLake can distinguish "object missing" (terminal) from
// "transient gateway error" (retriable).
func TestHandleProxyForwardsOrigin404Verbatim(t *testing.T) {
proxy := newTestProxy(t)
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("missing"))
})
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/gone.parquet", http.Header{"Range": []string{"bytes=0-1"}})
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want 404 forwarded verbatim", rec.Code)
}
}

// TestHandleProxyForwardsOrigin416Verbatim: Range Not Satisfiable carries
// semantically important metadata for DuckLake; collapsing to 502 made it
// look like a network error.
func TestHandleProxyForwardsOrigin416Verbatim(t *testing.T) {
proxy := newTestProxy(t)
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Range", "bytes */1024")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
})
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/short.parquet", http.Header{"Range": []string{"bytes=999999-1000000"}})
if rec.Code != http.StatusRequestedRangeNotSatisfiable {
t.Fatalf("status = %d, want 416 forwarded verbatim", rec.Code)
}
if cr := rec.Header().Get("Content-Range"); cr != "bytes */1024" {
t.Errorf("Content-Range = %q, want 'bytes */1024' (DuckDB uses this to learn the actual file size)", cr)
}
}

// TestHandleProxyDoesNotCacheErrorResponses: a transient origin error
// must not poison the cache. The next request for the same key has to hit
// the origin again — otherwise an ExpiredToken error would persist past
// the credential refresh that fixes it.
func TestHandleProxyDoesNotCacheErrorResponses(t *testing.T) {
proxy := newTestProxy(t)
var calls int32
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
n := atomic.AddInt32(&calls, 1)
if n == 1 {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("first call fails"))
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok-now"))
})

rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}})
if rec.Code != http.StatusBadRequest {
t.Fatalf("first call: status = %d, want 400", rec.Code)
}

rec = doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}})
if rec.Code != http.StatusPartialContent {
t.Fatalf("retry: status = %d, want 206 (cache must not have stored the prior error)", rec.Code)
}
if atomic.LoadInt32(&calls) != 2 {
t.Errorf("origin calls = %d, want 2 (the cache must not serve a previously-failed request from cache)", calls)
}
}

// TestHandleProxyNetworkErrorStill502: when the origin is fully
// unreachable (no HTTP response at all), 502 is still the right answer —
// there was no upstream status to forward, and httpfs's "retry on 5xx"
// behaviour is appropriate here.
func TestHandleProxyNetworkErrorStill502(t *testing.T) {
proxy := newTestProxy(t)
// Construct a URL that points at no listener: srv.Close before use.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
dead := srv.URL
srv.Close()

rec := doForwardProxyRequest(proxy, "GET", dead+"/bucket/anything", http.Header{"Range": []string{"bytes=0-1"}})
if rec.Code != http.StatusBadGateway {
t.Errorf("status = %d, want 502 on origin error", rec.Code)
t.Fatalf("status = %d, want 502 when origin is unreachable", rec.Code)
}
}

Expand Down Expand Up @@ -241,10 +363,10 @@ func TestFetchOriginPreservesSignedHeaders(t *testing.T) {
})

h := http.Header{
"Range": []string{"bytes=0-1"},
"Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"},
"X-Amz-Date": []string{"20260101T000000Z"},
"X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"},
"Range": []string{"bytes=0-1"},
"Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"},
"X-Amz-Date": []string{"20260101T000000Z"},
"X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"},
// Hop-by-hop header must NOT be forwarded.
"Proxy-Connection": []string{"Keep-Alive"},
}
Expand Down
Loading