Skip to content

Commit

Permalink
Split query by interval (grafana/phlare#713)
Browse files Browse the repository at this point in the history
* Separate handlers in querier fronted

* Clarify http responce decompression implementation details

* Draft query time split

* Split SelectSeries by time

* Align interval to step duration

* Remove unused code

* Add querier.max-concurrent option

* Fix connect headers
  • Loading branch information
kolesnikovae committed May 31, 2023
1 parent 01786d6 commit 4787ad4
Show file tree
Hide file tree
Showing 34 changed files with 869 additions and 69 deletions.
6 changes: 5 additions & 1 deletion cmd/phlare/help-all.txt.tmpl
Expand Up @@ -379,12 +379,16 @@ Usage of ./phlare:
Timeout for ingester client healthcheck RPCs. (default 5s)
-querier.id string
Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.
-querier.max-concurrent int
The maximum number of concurrent queries allowed. (default 4)
-querier.max-query-length duration
The limit to length of queries. 0 to disable. (default 30d1h)
-querier.max-query-lookback duration
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.
-querier.max-query-parallelism int
Maximum number of queries that will be scheduled in parallel by the frontend. (default 32)
Maximum number of queries that will be scheduled in parallel by the frontend.
-querier.split-queries-by-interval duration
Split queries by a time interval and execute in parallel. The value 0 disables splitting by time
-query-frontend.grpc-client-config.backoff-max-period duration
Maximum delay when backing off. (default 10s)
-query-frontend.grpc-client-config.backoff-min-period duration
Expand Down
4 changes: 3 additions & 1 deletion cmd/phlare/help.txt.tmpl
Expand Up @@ -134,7 +134,9 @@ Usage of ./phlare:
-querier.max-query-lookback duration
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.
-querier.max-query-parallelism int
Maximum number of queries that will be scheduled in parallel by the frontend. (default 32)
Maximum number of queries that will be scheduled in parallel by the frontend.
-querier.split-queries-by-interval duration
Split queries by a time interval and execute in parallel. The value 0 disables splitting by time
-query-scheduler.max-outstanding-requests-per-tenant int
Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100)
-query-scheduler.ring.consul.hostname string
Expand Down
Expand Up @@ -229,7 +229,12 @@ limits:
# Maximum number of queries that will be scheduled in parallel by the
# frontend.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 32]
[max_query_parallelism: <int> | default = 0]

# Split queries by a time interval and execute in parallel. The value 0
# disables splitting by time
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

# The query_scheduler block configures the query-scheduler.
[query_scheduler: <query_scheduler>]
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api.go
Expand Up @@ -219,9 +219,11 @@ func (a *API) RegisterRing(r http.Handler) {

// RegisterQuerier registers the endpoints associated with the querier.
func (a *API) RegisterQuerier(svc querierv1connect.QuerierServiceHandler) {
handlers := querier.NewHTTPHandlers(svc)
querierv1connect.RegisterQuerierServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.grpcLogMiddleware)
}

func (a *API) RegisterPyroscopeHandlers(client querierv1connect.QuerierServiceClient) {
handlers := querier.NewHTTPHandlers(client)
a.RegisterRoute("/pyroscope/render", http.HandlerFunc(handlers.Render), true, true, "GET")
a.RegisterRoute("/pyroscope/render-diff", http.HandlerFunc(handlers.RenderDiff), true, true, "GET")
a.RegisterRoute("/pyroscope/label-values", http.HandlerFunc(handlers.LabelValues), true, true, "GET")
Expand Down
12 changes: 9 additions & 3 deletions pkg/frontend/frontend.go
Expand Up @@ -86,12 +86,18 @@ type Frontend struct {
// frontend workers will read from this channel, and send request to scheduler.
requestsCh chan *frontendRequest

limits Limits
schedulerWorkers *frontendSchedulerWorkers
schedulerWorkersWatcher *services.FailureWatcher
requests *requestsInProgress
frontendpb.UnimplementedFrontendForQuerierServer
}

type Limits interface {
QuerySplitDuration(string) time.Duration
MaxQueryParallelism(string) int
}

type frontendRequest struct {
queryID uint64
request *httpgrpc.HTTPRequest
Expand Down Expand Up @@ -121,7 +127,7 @@ type enqueueResult struct {
}

// NewFrontend creates a new frontend.
func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Frontend, error) {
func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Registerer) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)

schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log, reg)
Expand All @@ -132,6 +138,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Fronte
f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
requestsCh: requestsCh,
schedulerWorkers: schedulerWorkers,
schedulerWorkersWatcher: services.NewFailureWatcher(),
Expand Down Expand Up @@ -259,8 +266,7 @@ enqueueAgain:

case resp := <-freq.response:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
stats.FromContext(ctx).Merge(resp.Stats) // Safe if stats is nil.
}

return resp.HttpResponse, nil
Expand Down
14 changes: 14 additions & 0 deletions pkg/frontend/frontend_diff.go
@@ -0,0 +1,14 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) Diff(ctx context.Context, c *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) {
return connectgrpc.RoundTripUnary[querierv1.DiffRequest, querierv1.DiffResponse](ctx, f, c)
}
14 changes: 14 additions & 0 deletions pkg/frontend/frontend_label_names.go
@@ -0,0 +1,14 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) LabelNames(ctx context.Context, c *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
return connectgrpc.RoundTripUnary[typesv1.LabelNamesRequest, typesv1.LabelNamesResponse](ctx, f, c)
}
14 changes: 14 additions & 0 deletions pkg/frontend/frontend_label_values.go
@@ -0,0 +1,14 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) LabelValues(ctx context.Context, c *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
return connectgrpc.RoundTripUnary[typesv1.LabelValuesRequest, typesv1.LabelValuesResponse](ctx, f, c)
}
14 changes: 14 additions & 0 deletions pkg/frontend/frontend_profile_types.go
@@ -0,0 +1,14 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) ProfileTypes(ctx context.Context, c *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) {
return connectgrpc.RoundTripUnary[querierv1.ProfileTypesRequest, querierv1.ProfileTypesResponse](ctx, f, c)
}
15 changes: 15 additions & 0 deletions pkg/frontend/frontend_select_merge_profile.go
@@ -0,0 +1,15 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) SelectMergeProfile(ctx context.Context, c *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[profilev1.Profile], error) {
return connectgrpc.RoundTripUnary[querierv1.SelectMergeProfileRequest, profilev1.Profile](ctx, f, c)
}
64 changes: 64 additions & 0 deletions pkg/frontend/frontend_select_merge_stacktraces.go
@@ -0,0 +1,64 @@
package frontend

import (
"context"
"net/http"
"time"

"github.com/bufbuild/connect-go"
"github.com/grafana/dskit/tenant"
"golang.org/x/sync/errgroup"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/util/connectgrpc"
"github.com/grafana/phlare/pkg/util/httpgrpc"
"github.com/grafana/phlare/pkg/util/validation"
)

func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
c *connect.Request[querierv1.SelectMergeStacktracesRequest]) (
*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
g.SetLimit(maxConcurrent)
}

m := phlaremodel.NewFlameGraphMerger()
interval := validation.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
intervals := NewTimeIntervalIterator(time.UnixMilli(c.Msg.Start), time.UnixMilli(c.Msg.End), interval)

for intervals.Next() {
r := intervals.At()
g.Go(func() error {
req := connectgrpc.CloneRequest(c, &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: c.Msg.ProfileTypeID,
LabelSelector: c.Msg.LabelSelector,
Start: r.Start.UnixMilli(),
End: r.End.UnixMilli(),
MaxNodes: c.Msg.MaxNodes,
})
resp, err := connectgrpc.RoundTripUnary[
querierv1.SelectMergeStacktracesRequest,
querierv1.SelectMergeStacktracesResponse](ctx, f, req)
if err != nil {
return err
}
m.MergeFlameGraph(resp.Msg.Flamegraph)
return nil
})
}

if err = g.Wait(); err != nil {
return nil, err
}

return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: m.FlameGraph(c.Msg.GetMaxNodes()),
}), nil
}
64 changes: 64 additions & 0 deletions pkg/frontend/frontend_select_series.go
@@ -0,0 +1,64 @@
package frontend

import (
"context"
"net/http"
"time"

"github.com/bufbuild/connect-go"
"github.com/grafana/dskit/tenant"
"golang.org/x/sync/errgroup"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/util/connectgrpc"
"github.com/grafana/phlare/pkg/util/httpgrpc"
"github.com/grafana/phlare/pkg/util/validation"
)

func (f *Frontend) SelectSeries(ctx context.Context,
c *connect.Request[querierv1.SelectSeriesRequest]) (
*connect.Response[querierv1.SelectSeriesResponse], error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
g.SetLimit(maxConcurrent)
}

m := phlaremodel.NewSeriesMerger(false)
interval := validation.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
intervals := NewTimeIntervalIterator(time.UnixMilli(c.Msg.Start), time.UnixMilli(c.Msg.End), interval,
WithAlignment(time.Second*time.Duration(c.Msg.Step)))

for intervals.Next() {
r := intervals.At()
g.Go(func() error {
req := connectgrpc.CloneRequest(c, &querierv1.SelectSeriesRequest{
ProfileTypeID: c.Msg.ProfileTypeID,
LabelSelector: c.Msg.LabelSelector,
Start: r.Start.UnixMilli(),
End: r.End.UnixMilli(),
GroupBy: c.Msg.GroupBy,
Step: c.Msg.Step,
})
resp, err := connectgrpc.RoundTripUnary[
querierv1.SelectSeriesRequest,
querierv1.SelectSeriesResponse](ctx, f, req)
if err != nil {
return err
}
m.MergeSeries(resp.Msg.Series)
return nil
})
}

if err = g.Wait(); err != nil {
return nil, err
}

return connect.NewResponse(&querierv1.SelectSeriesResponse{Series: m.Series()}), nil
}
14 changes: 14 additions & 0 deletions pkg/frontend/frontend_series.go
@@ -0,0 +1,14 @@
package frontend

import (
"context"

"github.com/bufbuild/connect-go"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
)

func (f *Frontend) Series(ctx context.Context, c *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
return connectgrpc.RoundTripUnary[querierv1.SeriesRequest, querierv1.SeriesResponse](ctx, f, c)
}
8 changes: 7 additions & 1 deletion pkg/frontend/frontend_test.go
Expand Up @@ -71,7 +71,7 @@ func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.R
cfg.Port = port

logger := log.NewLogfmtLogger(os.Stdout)
f, err := NewFrontend(cfg, logger, reg)
f, err := NewFrontend(cfg, mockLimits{}, logger, reg)
require.NoError(t, err)

frontendpbconnect.RegisterFrontendForQuerierHandler(mux, f)
Expand Down Expand Up @@ -338,6 +338,12 @@ func TestFrontendFailedCancellation(t *testing.T) {
})
}

type mockLimits struct{}

func (mockLimits) QuerySplitDuration(string) time.Duration { return 0 }

func (mockLimits) MaxQueryParallelism(string) int { return 1 }

type mockScheduler struct {
t *testing.T
f *Frontend
Expand Down

0 comments on commit 4787ad4

Please sign in to comment.