Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion pkg/model/provider/anthropic/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ package anthropic

import (
"errors"
"net/http"

"github.com/anthropics/anthropic-sdk-go"
"github.com/anthropics/anthropic-sdk-go/shared"

"github.com/docker/docker-agent/pkg/modelerrors"
)

// wrapAnthropicError wraps an Anthropic SDK error in a *modelerrors.StatusError
// to carry HTTP status code and Retry-After metadata for the retry loop.
// Non-Anthropic errors (e.g. io.EOF, network errors) pass through unchanged.
//
// Anthropic streams reply with HTTP 200 even when an error occurs mid-stream:
// the SSE stream contains a `type: error` event whose body looks like
//
// {"type":"error","error":{"type":"api_error","message":"Internal server error"}}
//
// In that case the SDK builds an *anthropic.Error with StatusCode == 200, which
// would short-circuit WrapHTTPError and surface the raw SDK message to the
// user. We map the in-band error type to its closest HTTP equivalent so the
// generic retry/format pipeline (modelerrors.ClassifyModelError, StatusError)
// behaves the same as for transport-level errors.
func wrapAnthropicError(err error) error {
if err == nil {
return nil
Expand All @@ -19,5 +32,37 @@ func wrapAnthropicError(err error) error {
if !ok {
return err
}
return modelerrors.WrapHTTPError(apiErr.StatusCode, apiErr.Response, err)
statusCode := apiErr.StatusCode
if statusCode < 400 {
statusCode = statusCodeForAnthropicErrorType(apiErr.Type())
}
return modelerrors.WrapHTTPError(statusCode, apiErr.Response, err)
}

// statusCodeForAnthropicErrorType maps an Anthropic in-band SSE error type
// (see shared.ErrorType) to the HTTP status code with the same retry/fallback
// semantics. Unknown or empty types fall back to 500 so the error is treated
// as a transient server error and retried.
func statusCodeForAnthropicErrorType(t shared.ErrorType) int {
switch t {
case shared.ErrorTypeOverloadedError:
return 529 // Anthropic's documented overloaded code; retryable.
case shared.ErrorTypeRateLimitError:
return http.StatusTooManyRequests
case shared.ErrorTypeTimeoutError:
return http.StatusGatewayTimeout
case shared.ErrorTypeAuthenticationError:
return http.StatusUnauthorized
case shared.ErrorTypePermissionError:
return http.StatusForbidden
case shared.ErrorTypeNotFoundError:
return http.StatusNotFound
case shared.ErrorTypeBillingError:
return http.StatusPaymentRequired
case shared.ErrorTypeInvalidRequestError:
return http.StatusBadRequest
case shared.ErrorTypeAPIError:
return http.StatusInternalServerError
}
return http.StatusInternalServerError
}
87 changes: 87 additions & 0 deletions pkg/model/provider/anthropic/wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ func makeTestAnthropicError(t *testing.T, statusCode int, retryAfterValue string
}
}

// makeTestSSEAnthropicError simulates the in-band SSE error path: the HTTP
// response was 200 OK but a `type:error` event arrived in the stream, so the
// SDK populated an *anthropic.Error with StatusCode == 200 and a body whose
// `error.type` indicates the actual failure (e.g. "api_error",
// "overloaded_error"). See https://github.com/docker/docker-agent/issues/2870.
func makeTestSSEAnthropicError(t *testing.T, errorType, message string) *anthropic.Error {
t.Helper()
resp := httptest.NewRecorder().Result()
resp.StatusCode = http.StatusOK
req, _ := http.NewRequestWithContext(t.Context(), http.MethodPost, "https://api.anthropic.com/v1/messages", http.NoBody)
body := fmt.Sprintf(`{"type":"error","error":{"type":%q,"message":%q},"request_id":"req_test"}`, errorType, message)
apiErr := &anthropic.Error{
StatusCode: http.StatusOK,
Response: resp,
Request: req,
RequestID: "req_test",
}
require.NoError(t, apiErr.UnmarshalJSON([]byte(body)))
return apiErr
}

func TestWrapAnthropicError(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -103,4 +124,70 @@ func TestWrapAnthropicError(t *testing.T) {
assert.True(t, rateLimited)
assert.Equal(t, 5*time.Second, retryAfter)
})

// Issue #2870: SSE in-band errors arrive as *anthropic.Error with HTTP 200.
// We must synthesize a sensible HTTP status from the body's error.type so
// the generic retry/format pipeline kicks in and the user sees a friendly
// message instead of the raw `200 {"type":"error",...}` blob.
t.Run("sse in-band api_error becomes retryable HTTP 500", func(t *testing.T) {
t.Parallel()
apiErr := makeTestSSEAnthropicError(t, "api_error", "Internal server error")
result := wrapAnthropicError(apiErr)
var se *modelerrors.StatusError
require.ErrorAs(t, result, &se)
assert.Equal(t, http.StatusInternalServerError, se.StatusCode)
retryable, rateLimited, _ := modelerrors.ClassifyModelError(result)
assert.True(t, retryable, "api_error in SSE stream must be retryable")
assert.False(t, rateLimited)
// The user-facing message must surface error.type and error.message,
// not the raw "200 {...}" SDK blob.
assert.Contains(t, se.Error(), "api_error: Internal server error")
assert.NotContains(t, se.Error(), ": 200")
})

t.Run("sse in-band overloaded_error becomes retryable HTTP 529", func(t *testing.T) {
t.Parallel()
apiErr := makeTestSSEAnthropicError(t, "overloaded_error", "Anthropic is overloaded")
result := wrapAnthropicError(apiErr)
var se *modelerrors.StatusError
require.ErrorAs(t, result, &se)
assert.Equal(t, 529, se.StatusCode)
retryable, _, _ := modelerrors.ClassifyModelError(result)
assert.True(t, retryable)
})

t.Run("sse in-band rate_limit_error becomes rate-limited HTTP 429", func(t *testing.T) {
t.Parallel()
apiErr := makeTestSSEAnthropicError(t, "rate_limit_error", "Slow down")
result := wrapAnthropicError(apiErr)
var se *modelerrors.StatusError
require.ErrorAs(t, result, &se)
assert.Equal(t, http.StatusTooManyRequests, se.StatusCode)
retryable, rateLimited, _ := modelerrors.ClassifyModelError(result)
assert.False(t, retryable)
assert.True(t, rateLimited)
})

t.Run("sse in-band authentication_error is not retryable", func(t *testing.T) {
t.Parallel()
apiErr := makeTestSSEAnthropicError(t, "authentication_error", "Invalid API key")
result := wrapAnthropicError(apiErr)
var se *modelerrors.StatusError
require.ErrorAs(t, result, &se)
assert.Equal(t, http.StatusUnauthorized, se.StatusCode)
retryable, rateLimited, _ := modelerrors.ClassifyModelError(result)
assert.False(t, retryable, "auth errors must not be retried")
assert.False(t, rateLimited)
})

t.Run("sse in-band unknown error type defaults to retryable HTTP 500", func(t *testing.T) {
t.Parallel()
apiErr := makeTestSSEAnthropicError(t, "some_new_error_type", "unknown")
result := wrapAnthropicError(apiErr)
var se *modelerrors.StatusError
require.ErrorAs(t, result, &se)
assert.Equal(t, http.StatusInternalServerError, se.StatusCode)
retryable, _, _ := modelerrors.ClassifyModelError(result)
assert.True(t, retryable, "unknown SSE errors should be treated as transient")
})
}
Loading