Skip to content

Commit

Permalink
Move splitter to own type
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
Danny Kopping committed Jan 3, 2024
1 parent 628cace commit 11c9f8a
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 196 deletions.
52 changes: 27 additions & 25 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,46 +153,48 @@ func NewMiddleware(

var codec base.Codec = DefaultCodec

indexStatsTripperware, err := NewIndexStatsTripperware(cfg, iqo, log, limits, schema, codec, statsCache,
split := newDefaultSplitter(limits, iqo)

indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache,
cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}

metricsTripperware, err := NewMetricTripperware(cfg, iqo, engineOpts, log, limits, schema, codec, resultsCache,
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}

limitedTripperware, err := NewLimitedTripperware(cfg, iqo, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split)
if err != nil {
return nil, nil, err
}

// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, iqo, engineOpts, log, limits, schema, codec, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}

seriesTripperware, err := NewSeriesTripperware(cfg, iqo, log, limits, metrics, schema, DefaultCodec, metricsNamespace)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, DefaultCodec, split, metricsNamespace)
if err != nil {
return nil, nil, err
}

labelsTripperware, err := NewLabelsTripperware(cfg, iqo, log, limits, codec, metrics, schema, metricsNamespace)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, metrics, schema, metricsNamespace)
if err != nil {
return nil, nil, err
}

instantMetricTripperware, err := NewInstantMetricTripperware(cfg, iqo, engineOpts, log, limits, schema, metrics, indexStatsTripperware, metricsNamespace)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}

seriesVolumeTripperware, err := NewVolumeTripperware(cfg, iqo, log, limits, schema, codec, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -383,7 +385,7 @@ func getOperation(path string) string {
}

// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests.
func NewLogFilterTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand All @@ -392,7 +394,7 @@ func NewLogFilterTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOp
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cfg.CacheResults {
Expand Down Expand Up @@ -447,7 +449,7 @@ func NewLogFilterTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOp
}

// NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression.
func NewLimitedTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger) (base.Middleware, error) {
func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand All @@ -461,7 +463,7 @@ func NewLimitedTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts
// potentially GB of logs being returned by all the shards and splits which will overwhelm the frontend
// Therefore we force max parallelism to one so that these queries are executed sequentially.
// Below we also fix the number of shards to a static number.
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, 1), merger, splitByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, 1), merger, split, metrics.SplitByMetrics),
NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
}

Expand All @@ -473,15 +475,15 @@ func NewLimitedTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts
}

// NewSeriesTripperware creates a new frontend tripperware responsible for handling series requests
func NewSeriesTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Logger, limits Limits, metrics *Metrics, schema config.SchemaConfig, merger base.Merger, metricsNamespace string) (base.Middleware, error) {
func NewSeriesTripperware(cfg Config, log log.Logger, limits Limits, metrics *Metrics, schema config.SchemaConfig, merger base.Merger, split splitter, metricsNamespace string) (base.Middleware, error) {
queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks.
// Force a 24 hours split by for series API, this will be more efficient with our static daily bucket storage.
// This would avoid queriers downloading chunks for same series over and over again for serving smaller queries.
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), merger, splitByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), merger, split, metrics.SplitByMetrics),
}

if cfg.MaxRetries > 0 {
Expand Down Expand Up @@ -510,14 +512,14 @@ func NewSeriesTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Log
}

// NewLabelsTripperware creates a new frontend tripperware responsible for handling labels requests.
func NewLabelsTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Logger, limits Limits, merger base.Merger, metrics *Metrics, schema config.SchemaConfig, metricsNamespace string) (base.Middleware, error) {
func NewLabelsTripperware(cfg Config, log log.Logger, limits Limits, merger base.Merger, split splitter, metrics *Metrics, schema config.SchemaConfig, metricsNamespace string) (base.Middleware, error) {
queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage.
// This is because the labels API is an index-only operation.
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), merger, splitByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), merger, split, metrics.SplitByMetrics),
}

if cfg.MaxRetries > 0 {
Expand All @@ -534,7 +536,7 @@ func NewLabelsTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Log
}

// NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewMetricTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
var queryCacheMiddleware base.Middleware
if cfg.CacheResults {
Expand Down Expand Up @@ -588,7 +590,7 @@ func NewMetricTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitMetricByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
)

if cfg.CacheResults {
Expand Down Expand Up @@ -644,7 +646,7 @@ func NewMetricTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts
}

// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewInstantMetricTripperware(cfg Config, iqo util.IngesterQueryOptions, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand Down Expand Up @@ -686,7 +688,7 @@ func NewInstantMetricTripperware(cfg Config, iqo util.IngesterQueryOptions, engi
}), nil
}

func NewVolumeTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
// Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d).
// Indices are sharded by 24 hours, so we split the volume request in 24h intervals.
limits = WithSplitByLimits(limits, 24*time.Hour)
Expand Down Expand Up @@ -724,8 +726,8 @@ func NewVolumeTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Log
indexTw, err := sharedIndexTripperware(
cacheMiddleware,
cfg,
iqo,
merger,
split,
limits,
log,
metrics,
Expand Down Expand Up @@ -794,7 +796,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M
})
}

func NewIndexStatsTripperware(cfg Config, iqo util.IngesterQueryOptions, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
// Parallelize the index stats requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d).
// Indices are sharded by 24 hours, so we split the stats request in 24h intervals.
limits = WithSplitByLimits(limits, 24*time.Hour)
Expand Down Expand Up @@ -833,8 +835,8 @@ func NewIndexStatsTripperware(cfg Config, iqo util.IngesterQueryOptions, log log
tw, err := sharedIndexTripperware(
cacheMiddleware,
cfg,
iqo,
merger,
split,
limits,
log,
metrics,
Expand All @@ -851,8 +853,8 @@ func NewIndexStatsTripperware(cfg Config, iqo util.IngesterQueryOptions, log log
func sharedIndexTripperware(
cacheMiddleware base.Middleware,
cfg Config,
iqo util.IngesterQueryOptions,
merger base.Merger,
split splitter,
limits Limits,
log log.Logger,
metrics *Metrics,
Expand All @@ -863,7 +865,7 @@ func sharedIndexTripperware(
middlewares := []base.Middleware{
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime(limits, iqo), metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cacheMiddleware != nil {
Expand Down

0 comments on commit 11c9f8a

Please sign in to comment.