Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the response status code logged in 'query stats' on error #8407

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
* [BUGFIX] Ingester, store-gateway: fix case insensitive regular expressions not matching correctly some Unicode characters. #8391
* [BUGFIX] Query-frontend: "query stats" log now includes the actual `status_code` when the request fails due to an error occurring in the query-frontend itself. #8407

### Mixin

Expand Down
34 changes: 26 additions & 8 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queryResponseTime := time.Since(startTime)

if err != nil {
writeError(w, err)
f.reportQueryStats(r, params, startTime, queryResponseTime, 0, queryDetails, 0, err)
statusCode := writeError(w, err)
f.reportQueryStats(r, params, startTime, queryResponseTime, 0, queryDetails, statusCode, err)
return
}

Expand Down Expand Up @@ -428,7 +428,8 @@ func formatRequestHeaders(h *http.Header, headersToLog []string) (fields []any)
return fields
}

func writeError(w http.ResponseWriter, err error) {
// writeError writes the error response to http.ResponseWriter, and returns the response HTTP status code.
func writeError(w http.ResponseWriter, err error) int {
switch {
case errors.Is(err, context.Canceled):
err = errCanceled
Expand All @@ -440,13 +441,30 @@ func writeError(w http.ResponseWriter, err error) {
}
}

// if the error is an APIError, ensure it gets written as a JSON response
if resp, ok := apierror.HTTPResponseFromError(err); ok {
_ = httpgrpc.WriteResponse(w, resp)
return
var (
res *httpgrpc.HTTPResponse
resFound bool
)

// If the error is an APIError, ensure it gets written as a JSON response.
// Otherwise, check if there's a response encoded in the gRPC error.
res, resFound = apierror.HTTPResponseFromError(err)
if !resFound {
res, resFound = httpgrpc.HTTPResponseFromError(err)
}

// If we've been able to get the HTTP response from the error, then we send
// it with the right status code and response body content.
if resFound {
_ = httpgrpc.WriteResponse(w, res)
return int(res.Code)
}

httpgrpc.WriteError(w, err)
// Otherwise, we do fallback to a 5xx error, returning the non-formatted error
// message in the response body.
statusCode := http.StatusInternalServerError
http.Error(w, err.Error(), statusCode)
return statusCode
}

func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
Expand Down
105 changes: 95 additions & 10 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/activitytracker"
Expand All @@ -54,12 +55,19 @@ func TestWriteError(t *testing.T) {
}{
{http.StatusInternalServerError, errors.New("unknown")},
{http.StatusGatewayTimeout, context.DeadlineExceeded},
{http.StatusGatewayTimeout, errors.Wrap(context.DeadlineExceeded, "an error occurred")},
{StatusClientClosedRequest, context.Canceled},
{StatusClientClosedRequest, errors.Wrap(context.Canceled, "an error occurred")},
{http.StatusBadRequest, httpgrpc.Errorf(http.StatusBadRequest, "")},
{http.StatusBadRequest, errors.Wrap(httpgrpc.Errorf(http.StatusBadRequest, ""), "an error occurred")},
{http.StatusBadRequest, apierror.New(apierror.TypeBadData, "")},
pracucci marked this conversation as resolved.
Show resolved Hide resolved
{http.StatusBadRequest, errors.Wrap(apierror.New(apierror.TypeBadData, "invalid request"), "an error occurred")},
{http.StatusNotFound, apierror.New(apierror.TypeNotFound, "")},
{http.StatusNotFound, errors.Wrap(apierror.New(apierror.TypeNotFound, "invalid request"), "an error occurred")},
} {
t.Run(test.err.Error(), func(t *testing.T) {
w := httptest.NewRecorder()
writeError(w, test.err)
require.Equal(t, test.status, writeError(w, test.err))
require.Equal(t, test.status, w.Result().StatusCode)
})
}
Expand All @@ -68,10 +76,20 @@ func TestWriteError(t *testing.T) {
func TestHandler_ServeHTTP(t *testing.T) {
const testRouteName = "the_test_route"

makeSuccessfulDownstreamResponse := func() *http.Response {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}
}

for _, tt := range []struct {
name string
cfg HandlerConfig
request func() *http.Request
downstreamResponse *http.Response
downstreamErr error
expectedStatusCode int
expectedParams url.Values
expectedMetrics int
expectedActivity string
Expand All @@ -90,6 +108,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Header.Add("User-Agent", "test-user-agent")
return r
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
Expand All @@ -106,6 +126,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Header.Add("User-Agent", "test-user-agent")
return r
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
Expand All @@ -122,6 +144,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Header.Add("User-Agent", "test-user-agent")
return r.WithContext(api.ContextWithReadConsistency(context.Background(), api.ReadConsistencyStrong))
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
Expand All @@ -138,6 +162,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Header.Add("User-Agent", "test-user-agent")
return r
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{},
expectedMetrics: 5,
expectedActivity: "user:12345 UA:test-user-agent req:GET /api/v1/query (no params)",
Expand All @@ -151,6 +177,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Header.Add("User-Agent", "test-user-agent")
return r
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
Expand Down Expand Up @@ -194,8 +222,10 @@ func TestHandler_ServeHTTP(t *testing.T) {
r.Body = io.NopCloser(bytes.NewReader(compressed))
return r
},
expectedActivity: "user:12345 UA:test-user-agent req:GET /api/v1/read end_0=42&end_1=20&hints_1=%7B%22step_ms%22%3A1000%7D&matchers_0=%7B__name__%3D%22some_metric%22%2Cfoo%3D~%22.%2Abar.%2A%22%7D&matchers_1=%7B__name__%3D%22up%22%7D&start_0=0&start_1=10",
expectedMetrics: 5,
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedActivity: "user:12345 UA:test-user-agent req:GET /api/v1/read end_0=42&end_1=20&hints_1=%7B%22step_ms%22%3A1000%7D&matchers_0=%7B__name__%3D%22some_metric%22%2Cfoo%3D~%22.%2Abar.%2A%22%7D&matchers_1=%7B__name__%3D%22up%22%7D&start_0=0&start_1=10",
expectedMetrics: 5,
expectedStatusCode: 200,
expectedParams: url.Values{
"matchers_0": []string{"{__name__=\"some_metric\",foo=~\".*bar.*\"}"},
"start_0": []string{"0"},
Expand All @@ -206,6 +236,54 @@ func TestHandler_ServeHTTP(t *testing.T) {
"hints_1": []string{"{\"step_ms\":1000}"},
},
},
{
name: "downstream returns an apierror with 4xx status code",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
return httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
},
downstreamErr: apierror.New(apierror.TypeBadData, "invalid request"),
expectedStatusCode: 400,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "downstream returns a gRPC error with 4xx status code",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
return httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
},
downstreamErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid request"),
expectedStatusCode: 400,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "downstream returns a generic error",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
return httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
},
downstreamErr: errors.New("something unexpected happened"),
expectedStatusCode: 500,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
} {
t.Run(tt.name, func(t *testing.T) {
activityFile := filepath.Join(t.TempDir(), "activity-tracker")
Expand All @@ -221,10 +299,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
assert.Equal(t, tt.expectedParams, req.Form)
}

return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
return tt.downstreamResponse, tt.downstreamErr
})

reg := prometheus.NewPedanticRegistry()
Expand All @@ -243,7 +318,7 @@ func TestHandler_ServeHTTP(t *testing.T) {

handler.ServeHTTP(resp, req)
responseData, _ := io.ReadAll(resp.Body)
require.Equal(t, http.StatusOK, resp.Code)
require.Equal(t, tt.expectedStatusCode, resp.Code)

count, err := promtest.GatherAndCount(
reg,
Expand All @@ -268,14 +343,13 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.Equal(t, level.InfoValue(), msg["level"])
require.Equal(t, "query stats", msg["msg"])
require.Equal(t, "query-frontend", msg["component"])
require.Equal(t, "success", msg["status"])
require.EqualValues(t, tt.expectedStatusCode, msg["status_code"])
require.Equal(t, "12345", msg["user"])
require.Equal(t, req.Method, msg["method"])
require.Equal(t, req.URL.Path, msg["path"])
require.Equal(t, testRouteName, msg["route_name"])
require.Equal(t, req.UserAgent(), msg["user_agent"])
require.Contains(t, msg, "response_time")
require.Equal(t, int64(len(responseData)), msg["response_size_bytes"])
require.Contains(t, msg, "query_wall_time_seconds")
require.EqualValues(t, 0, msg["fetched_series_count"])
require.EqualValues(t, 0, msg["fetched_chunk_bytes"])
Expand All @@ -286,6 +360,17 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.EqualValues(t, 0, msg["estimated_series_count"])
require.EqualValues(t, 0, msg["queue_time_seconds"])

if tt.expectedStatusCode >= 200 && tt.expectedStatusCode < 300 {
require.Equal(t, "success", msg["status"])
} else {
require.Equal(t, "failed", msg["status"])
}

// The response size is tracked only for successful requests.
if tt.expectedStatusCode >= 200 && tt.expectedStatusCode < 300 {
require.Equal(t, int64(len(responseData)), msg["response_size_bytes"])
}

// Check that the HTTP or Protobuf request parameters are logged.
paramsLogged := 0
for key := range msg {
Expand Down
Loading