-
Notifications
You must be signed in to change notification settings - Fork 463
/
scheduler_processor.go
473 lines (404 loc) · 17.4 KB
/
scheduler_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/worker/scheduler_processor.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package worker
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util/httpgrpcutil"
util_log "github.com/grafana/mimir/pkg/util/log"
)
const (
// ResponseStreamingEnabledHeader is the header key used by http handlers to
// indicate to the scheduler processor that its response should be streamed. This
// header is internal to the querier only and removed before the response is sent
// over the network.
ResponseStreamingEnabledHeader = "X-Mimir-Stream-Grpc-Response"
responseStreamingBodyChunkSizeBytes = 1 * 1024 * 1024
maxNotifyFrontendRetries = 5
)
var errQuerierQuerySchedulerProcessingLoopTerminated = cancellation.NewErrorf("querier query-scheduler processing loop terminated")
var errQueryEvaluationFinished = cancellation.NewErrorf("query evaluation finished")
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
p := &schedulerProcessor{
log: log,
handler: handler,
streamResponse: streamResponse,
maxMessageSize: cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.QueryFrontendGRPCClientConfig,
streamingEnabled: cfg.ResponseStreamingEnabled,
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerpb.NewSchedulerForQuerierClient(conn)
},
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_querier_query_frontend_request_duration_seconds",
Help: "Time spend doing requests to frontend.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"}),
}
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_querier_query_frontend_clients",
Help: "The current number of clients connected to query-frontend.",
})
poolConfig := client.PoolConfig{
CheckInterval: 5 * time.Second,
HealthCheckEnabled: true,
HealthCheckTimeout: 1 * time.Second,
}
p.frontendPool = client.NewPool("frontend", poolConfig, nil, client.PoolAddrFunc(p.createFrontendClient), frontendClientsGauge, log)
return p, []services.Service{p.frontendPool}
}
type frontendResponseStreamer func(
ctx context.Context,
reqCtx context.Context,
c client.PoolClient,
queryID uint64,
response *httpgrpc.HTTPResponse,
stats *querier_stats.Stats,
logger log.Logger) error
// Handles incoming queries from query-scheduler.
type schedulerProcessor struct {
log log.Logger
handler RequestHandler
streamResponse frontendResponseStreamer
grpcConfig grpcclient.Config
maxMessageSize int
querierID string
streamingEnabled bool
frontendPool *client.Pool
frontendClientRequestDuration *prometheus.HistogramVec
schedulerClientFactory func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient
}
// notifyShutdown implements processor.
func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
client := sp.schedulerClientFactory(conn)
req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID}
if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil {
// Since we're shutting down there's nothing we can do except logging it.
level.Warn(sp.log).Log("msg", "failed to notify querier shutdown to query-scheduler", "address", address, "err", err)
}
}
func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address string) {
schedulerClient := sp.schedulerClientFactory(conn)
// Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context".
// The execution context is cancelled once the workerCtx is cancelled AND there's no inflight query executing.
execCtx, execCancel, inflightQuery := newExecutionContext(workerCtx, sp.log)
defer execCancel(errQuerierQuerySchedulerProcessingLoopTerminated)
backoff := backoff.New(execCtx, processorBackoffConfig)
for backoff.Ongoing() {
c, err := schedulerClient.QuerierLoop(execCtx)
if err == nil {
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
}
if err != nil {
level.Warn(sp.log).Log("msg", "error contacting scheduler", "err", err, "addr", address)
backoff.Wait()
continue
}
if err := sp.querierLoop(execCtx, c, address, inflightQuery); err != nil {
if !isErrCancel(err, log.With(sp.log, "addr", address)) {
// Do not log an error if the query-scheduler is shutting down.
if s, ok := grpcutil.ErrorToStatus(err); !ok || !strings.Contains(s.Message(), schedulerpb.ErrSchedulerIsNotRunning.Error()) {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
}
backoff.Wait()
continue
}
}
backoff.Reset()
}
}
// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(execCtx context.Context, c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool) (err error) {
// Build a child context so we can cancel a query when the stream is closed.
// Note that we deliberately don't use c.Context() here, as that is cancelled as soon as the gRPC client observes an error,
// but we don't always want to cancel queries if the scheduler stream reports an error (eg. if the scheduler crashed).
ctx, cancel := context.WithCancelCause(execCtx)
defer func() {
cancel(cancellation.NewErrorf("query-scheduler loop in querier for query-scheduler %v terminated with error: %w", address, err))
}()
queryComplete := make(chan struct{})
close(queryComplete) // Close the channel (signaling no query in progress) to simplify the logic below in the case where we receive no queries.
waitForQuery := func(err error) {
select {
case <-queryComplete:
// Query is already complete, nothing to do.
return
default:
// Query is not complete.
level.Info(sp.log).Log("msg", "query-scheduler loop in querier received non-cancellation error, waiting for inflight query to complete...", "err", err, "addr", address)
<-queryComplete
level.Info(sp.log).Log("msg", "query-scheduler loop in querier received non-cancellation error and inflight query is complete, continuing", "err", err, "addr", address)
}
}
schedulerStreamError := atomic.NewError(nil)
for {
request, err := c.Recv()
if err != nil {
schedulerStreamError.Store(err)
if grpcutil.IsCanceled(err) {
cancel(cancellation.NewErrorf("query cancelled: %w", err))
} else {
// If we got another kind of error (eg. scheduler crashed), continue processing the query.
waitForQuery(err)
}
return err
}
inflightQuery.Store(true)
queryComplete = make(chan struct{})
// Handle the request on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the query. We don't actually handle queries in parallel
// here, as we're running in lock step with the server - each Recv is
// paired with a Send.
go func() {
defer inflightQuery.Store(false)
defer close(queryComplete)
// Create a per-request context and cancel it once we're done processing the request.
// This is important for queries that stream chunks from ingesters to the querier, as SeriesChunksStreamReader relies
// on the context being cancelled to abort streaming and terminate a goroutine if the query is aborted. Requests that
// go direct to a querier's HTTP API have a context created and cancelled in a similar way by the Go runtime's
// net/http package.
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errQueryEvaluationFinished)
// We need to inject user into context for sending response back.
ctx = user.InjectOrgID(ctx, request.UserID)
tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()
ctx = spanCtx
if err := sp.updateTracingHeaders(request.HttpRequest, queueSpan); err != nil {
level.Warn(sp.log).Log("msg", "could not update trace headers on httpgrpc request, trace may be malformed", "err", err)
}
}
logger := util_log.WithContext(ctx, sp.log)
sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest, time.Duration(request.QueueTimeNanos))
// Report back to scheduler that processing of the query has finished.
if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil {
if previousErr := schedulerStreamError.Load(); previousErr != nil {
// If the stream has already been broken, it's expected that the Send() call will fail too.
// The error returned by Recv() is often more descriptive, so we include it in this log line as well.
level.Error(logger).Log(
"msg", "error notifying scheduler about finished query after the scheduler stream previously failed and returned error",
"err", err,
"addr", address,
"previousErr", previousErr,
)
} else {
level.Error(logger).Log("msg", "error notifying scheduler about finished query", "err", err, "addr", address)
}
}
}()
}
}
func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest, queueTime time.Duration) {
var stats *querier_stats.Stats
if statsEnabled {
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
stats.AddQueueTime(queueTime)
}
response, err := sp.handler.Handle(ctx, request)
if err != nil {
var ok bool
response, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
response = &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
}
}
}
// Ensure responses that are too big are not retried.
if len(response.Body) >= sp.maxMessageSize {
level.Error(logger).Log("msg", "response larger than max message size", "size", len(response.Body), "maxMessageSize", sp.maxMessageSize)
errMsg := fmt.Sprintf("response larger than the max message size (%d vs %d)", len(response.Body), sp.maxMessageSize)
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
}
var c client.PoolClient
// Even if this query has been cancelled, we still want to tell the frontend about it, otherwise the frontend will wait for a result until it times out.
frontendCtx := context.WithoutCancel(ctx)
bof := backoff.New(frontendCtx, backoff.Config{
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
MaxRetries: maxNotifyFrontendRetries,
})
var hasStreamHeader bool
response.Headers, hasStreamHeader = removeStreamingHeader(response.Headers)
shouldStream := hasStreamHeader && sp.streamingEnabled && len(response.Body) > responseStreamingBodyChunkSizeBytes
// Protect against not-yet-exited querier handler goroutines that could
// still be incrementing stats when sent for marshaling below.
stats = stats.Copy()
for bof.Ongoing() {
c, err = sp.frontendPool.GetClientFor(frontendAddress)
if err != nil {
break
}
if shouldStream {
err = sp.streamResponse(frontendCtx, ctx, c, queryID, response, stats, sp.log)
} else {
// Response is empty and uninteresting.
_, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(frontendCtx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
HttpResponse: response,
Stats: stats,
})
}
if err == nil {
break
}
level.Warn(logger).Log("msg", "retrying to notify frontend about finished query", "err", err, "frontend", frontendAddress, "retries", bof.NumRetries(), "query_id", queryID)
sp.frontendPool.RemoveClient(c, frontendAddress)
bof.Wait()
}
if err != nil {
level.Error(logger).Log("msg", "error notifying frontend about finished query", "err", err, "frontend", frontendAddress, "query_id", queryID)
}
}
func removeStreamingHeader(headers []*httpgrpc.Header) ([]*httpgrpc.Header, bool) {
streamEnabledViaHeader := false
for i, header := range headers {
if header.Key == ResponseStreamingEnabledHeader {
if header.Values[0] == "true" {
streamEnabledViaHeader = true
}
headers = append(headers[:i], headers[i+1:]...)
break
}
}
return headers, streamEnabledViaHeader
}
func streamResponse(
ctx context.Context,
reqCtx context.Context,
c client.PoolClient,
queryID uint64,
response *httpgrpc.HTTPResponse,
stats *querier_stats.Stats,
logger log.Logger,
) error {
sc, err := c.(frontendv2pb.FrontendForQuerierClient).QueryResultStream(ctx)
if err != nil {
return fmt.Errorf("error creating stream to frontend: %w", err)
}
// Send metadata
err = sc.Send(&frontendv2pb.QueryResultStreamRequest{
QueryID: queryID,
Data: &frontendv2pb.QueryResultStreamRequest_Metadata{Metadata: &frontendv2pb.QueryResultMetadata{
Code: response.Code,
Headers: response.Headers,
Stats: stats,
}},
})
if err != nil {
return fmt.Errorf("error sending initial response to frontend: %w", err)
}
// The response metadata has been sent successfully. After this point we can no longer
// return an error from this function as that would cause the response metadata to be sent
// again. This would be rejected by the frontend and the retry could never succeed.
sendBody:
// Send body chunks.
for offset := 0; offset < len(response.Body); {
select {
case <-reqCtx.Done():
level.Warn(logger).Log("msg", "response stream aborted", "cause", context.Cause(reqCtx))
break sendBody
default:
err = sc.Send(&frontendv2pb.QueryResultStreamRequest{
QueryID: queryID,
Data: &frontendv2pb.QueryResultStreamRequest_Body{Body: &frontendv2pb.QueryResultBody{
Chunk: response.Body[offset:min(offset+responseStreamingBodyChunkSizeBytes, len(response.Body))],
}},
})
if err != nil {
level.Warn(logger).Log("msg", "error streaming response body to frontend, aborting response stream", "err", err)
break sendBody
}
offset += responseStreamingBodyChunkSizeBytes
}
}
// Ignore error here because there's nothing we can do about it.
_, _ = sc.CloseAndRecv()
return nil
}
func (sp *schedulerProcessor) updateTracingHeaders(request *httpgrpc.HTTPRequest, span opentracing.Span) error {
// Reset any trace headers on the HTTP request with the new parent span ID: the child span for the HTTP request created
// by the HTTP tracing infrastructure uses the trace information in the HTTP request headers, ignoring the trace
// information in the Golang context.
return span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, httpGrpcHeaderWriter{request})
}
type httpGrpcHeaderWriter struct {
request *httpgrpc.HTTPRequest
}
var _ opentracing.TextMapWriter = httpGrpcHeaderWriter{}
func (w httpGrpcHeaderWriter) Set(key, val string) {
for _, h := range w.request.Headers {
if h.Key == key {
h.Values = []string{val}
return
}
}
w.request.Headers = append(w.request.Headers, &httpgrpc.Header{Key: key, Values: []string{val}})
}
func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClient, error) {
opts, err := sp.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
middleware.UnaryClientInstrumentInterceptor(sp.frontendClientRequestDuration),
}, []grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
middleware.StreamClientInstrumentInterceptor(sp.frontendClientRequestDuration),
})
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return &frontendClient{
FrontendForQuerierClient: frontendv2pb.NewFrontendForQuerierClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}
type frontendClient struct {
frontendv2pb.FrontendForQuerierClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}
func (fc *frontendClient) Close() error {
return fc.conn.Close()
}