diff --git a/cmd/cache-proxy/proxy.go b/cmd/cache-proxy/proxy.go index 7634b658..5ab6e4a9 100644 --- a/cmd/cache-proxy/proxy.go +++ b/cmd/cache-proxy/proxy.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "io" "log/slog" @@ -185,6 +186,22 @@ func (p *CacheProxy) HandleProxy(w http.ResponseWriter, r *http.Request) { data, contentType, source, err := p.fetchDedup(cacheKey, r, rangeHeader) if err != nil { + // An origin that responded with a non-2xx (e.g. S3 returning a 400 with + // ExpiredToken in an XML envelope) is forwarded back to + // DuckDB verbatim — same status code, same body, same headers minus + // hop-by-hop. This preserves the error class so httpfs can distinguish + // transient (5xx) from terminal (4xx) failures, and gives DuckLake the + // raw S3 error body it knows how to parse. + var oe *originStatusError + if errors.As(err, &oe) { + slog.Warn("Origin returned non-2xx; forwarding verbatim.", + "url", r.URL.String(), "range", rangeHeader, "status", oe.status, "body_preview", previewBody(oe.body)) + oe.writeTo(w) + return + } + // True transport-level failure (DNS, connection refused, TLS, timeout): + // no upstream status exists, so 502 Bad Gateway is the right answer + // here — and it's also the one DuckDB's httpfs treats as transient. slog.Error("Failed to fetch.", "url", r.URL.String(), "range", rangeHeader, "error", err) http.Error(w, err.Error(), http.StatusBadGateway) return @@ -251,8 +268,15 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) { defer func() { _ = resp.Body.Close() }() if resp.StatusCode >= 400 { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048)) - return nil, "", fmt.Errorf("origin %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + // Capture the body up to a generous cap. S3 error envelopes are + // typically <1 KiB; the cap is just a guard against a misbehaving + // origin streaming forever. The 60s context above also protects us. + body, _ := io.ReadAll(io.LimitReader(resp.Body, originErrorBodyCap)) + return nil, "", &originStatusError{ + status: resp.StatusCode, + headers: resp.Header.Clone(), + body: body, + } } data, err := io.ReadAll(resp.Body) @@ -262,6 +286,53 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) { return data, resp.Header.Get("Content-Type"), nil } +// originErrorBodyCap is the maximum number of bytes we'll buffer from a +// non-2xx origin response. S3 XML error envelopes are tiny; this is just a +// safety net. +const originErrorBodyCap = 1 << 20 // 1 MiB + +// originStatusError captures a non-2xx response from the origin so the +// proxy can forward it back to the client verbatim. The status code, body, +// and response headers are all preserved (minus hop-by-hop) so DuckDB sees +// exactly what S3 said — including the XML error envelope DuckLake may +// inspect. +type originStatusError struct { + status int + headers http.Header + body []byte +} + +func (e *originStatusError) Error() string { + return fmt.Sprintf("origin %d: %s", e.status, strings.TrimSpace(string(e.body))) +} + +// writeTo replays the captured origin response onto w. Any header the +// origin set that isn't a hop-by-hop is forwarded; status code and body +// follow. +func (e *originStatusError) writeTo(w http.ResponseWriter) { + for k, vv := range e.headers { + if hopByHop[strings.ToLower(k)] { + continue + } + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(e.status) + _, _ = w.Write(e.body) +} + +// previewBody returns up to 256 bytes of the body for log lines so we don't +// spam multi-KiB XML envelopes into structured logs while still keeping the +// useful prefix (S3 puts the ... first). +func previewBody(body []byte) string { + const max = 256 + if len(body) <= max { + return string(body) + } + return string(body[:max]) + "...(truncated)" +} + // serveBody writes cached data back to the client, reconstructing 206 Partial // Content semantics when the original request had a Range header. func (p *CacheProxy) serveBody(w http.ResponseWriter, data []byte, rangeHeader, contentType string) { diff --git a/cmd/cache-proxy/proxy_test.go b/cmd/cache-proxy/proxy_test.go index ac29129e..3e552577 100644 --- a/cmd/cache-proxy/proxy_test.go +++ b/cmd/cache-proxy/proxy_test.go @@ -118,14 +118,136 @@ func TestHandleProxyRejectsNonAbsoluteURL(t *testing.T) { } } -func TestHandleProxyOriginError(t *testing.T) { +// TestHandleProxyForwardsOrigin5xxVerbatim: any non-2xx the origin returns +// must be passed back to DuckDB unchanged. Translating a 500 into a 502 +// (the old behaviour) made DuckDB's httpfs retry transient-class errors that +// were really terminal, and hid the real status from logs and the client. +func TestHandleProxyForwardsOrigin5xxVerbatim(t *testing.T) { proxy := newTestProxy(t) _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { http.Error(w, "boom", http.StatusInternalServerError) }) rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/broken", http.Header{"Range": []string{"bytes=0-1"}}) + if rec.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want 500 forwarded verbatim", rec.Code) + } + if !strings.Contains(rec.Body.String(), "boom") { + t.Errorf("body = %q, want it to contain origin body 'boom'", rec.Body.String()) + } +} + +// TestHandleProxyForwardsOrigin400Verbatim is the case the user actually +// hit: S3 returns 400 with an XML envelope (ExpiredToken) and +// DuckDB needs to see *that* body and *that* status, not a generic 502 with +// a Go-formatted error string. Without verbatim passthrough the error class +// (4xx terminal vs 5xx retriable) is lost and httpfs retries non-retriable +// auth failures. +func TestHandleProxyForwardsOrigin400Verbatim(t *testing.T) { + proxy := newTestProxy(t) + const errBody = ` +ExpiredTokenThe provided token has expired.` + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/xml") + w.Header().Set("X-Amz-Request-Id", "TESTREQID123") + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(errBody)) + }) + rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/expired.parquet", http.Header{"Range": []string{"bytes=0-1023"}}) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400 forwarded verbatim", rec.Code) + } + if got := rec.Body.String(); got != errBody { + t.Errorf("body mismatch:\n got = %q\nwant = %q", got, errBody) + } + if ct := rec.Header().Get("Content-Type"); ct != "application/xml" { + t.Errorf("Content-Type = %q, want application/xml so DuckLake parses it as an S3 error envelope", ct) + } + if rid := rec.Header().Get("X-Amz-Request-Id"); rid != "TESTREQID123" { + t.Errorf("X-Amz-Request-Id = %q, want TESTREQID123 (preserved for debugging)", rid) + } +} + +// TestHandleProxyForwardsOrigin404Verbatim: a 404 must stay a 404 so +// DuckDB / DuckLake can distinguish "object missing" (terminal) from +// "transient gateway error" (retriable). +func TestHandleProxyForwardsOrigin404Verbatim(t *testing.T) { + proxy := newTestProxy(t) + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte("missing")) + }) + rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/gone.parquet", http.Header{"Range": []string{"bytes=0-1"}}) + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404 forwarded verbatim", rec.Code) + } +} + +// TestHandleProxyForwardsOrigin416Verbatim: Range Not Satisfiable carries +// semantically important metadata for DuckLake; collapsing to 502 made it +// look like a network error. +func TestHandleProxyForwardsOrigin416Verbatim(t *testing.T) { + proxy := newTestProxy(t) + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Range", "bytes */1024") + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + }) + rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/short.parquet", http.Header{"Range": []string{"bytes=999999-1000000"}}) + if rec.Code != http.StatusRequestedRangeNotSatisfiable { + t.Fatalf("status = %d, want 416 forwarded verbatim", rec.Code) + } + if cr := rec.Header().Get("Content-Range"); cr != "bytes */1024" { + t.Errorf("Content-Range = %q, want 'bytes */1024' (DuckDB uses this to learn the actual file size)", cr) + } +} + +// TestHandleProxyDoesNotCacheErrorResponses: a transient origin error +// must not poison the cache. The next request for the same key has to hit +// the origin again — otherwise an ExpiredToken error would persist past +// the credential refresh that fixes it. +func TestHandleProxyDoesNotCacheErrorResponses(t *testing.T) { + proxy := newTestProxy(t) + var calls int32 + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&calls, 1) + if n == 1 { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("first call fails")) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok-now")) + }) + + rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}}) + if rec.Code != http.StatusBadRequest { + t.Fatalf("first call: status = %d, want 400", rec.Code) + } + + rec = doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}}) + if rec.Code != http.StatusPartialContent { + t.Fatalf("retry: status = %d, want 206 (cache must not have stored the prior error)", rec.Code) + } + if atomic.LoadInt32(&calls) != 2 { + t.Errorf("origin calls = %d, want 2 (the cache must not serve a previously-failed request from cache)", calls) + } +} + +// TestHandleProxyNetworkErrorStill502: when the origin is fully +// unreachable (no HTTP response at all), 502 is still the right answer — +// there was no upstream status to forward, and httpfs's "retry on 5xx" +// behaviour is appropriate here. +func TestHandleProxyNetworkErrorStill502(t *testing.T) { + proxy := newTestProxy(t) + // Construct a URL that points at no listener: srv.Close before use. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + dead := srv.URL + srv.Close() + + rec := doForwardProxyRequest(proxy, "GET", dead+"/bucket/anything", http.Header{"Range": []string{"bytes=0-1"}}) if rec.Code != http.StatusBadGateway { - t.Errorf("status = %d, want 502 on origin error", rec.Code) + t.Fatalf("status = %d, want 502 when origin is unreachable", rec.Code) } } @@ -241,10 +363,10 @@ func TestFetchOriginPreservesSignedHeaders(t *testing.T) { }) h := http.Header{ - "Range": []string{"bytes=0-1"}, - "Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"}, - "X-Amz-Date": []string{"20260101T000000Z"}, - "X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"}, + "Range": []string{"bytes=0-1"}, + "Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"}, + "X-Amz-Date": []string{"20260101T000000Z"}, + "X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"}, // Hop-by-hop header must NOT be forwarded. "Proxy-Connection": []string{"Keep-Alive"}, }