Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributing sum queries #1878

Merged
merged 26 commits into from Feb 21, 2020

Conversation

owen-d
Copy link
Contributor

@owen-d owen-d commented Dec 4, 2019

What

This PR (squashed from #1730 for simplicity) introduces a new query-frontend flag, querier.sum-shards, which allows cortex to fan out and process queries in parallel where appropriate. In order for a query to be parallelizeable, the query itself must be parallelizeable and it must use a sharded schema (v10+).

Sharding

The v10 schema introduced a shard factor which spreads series across n shards. This is a prerequisite for allowing us to query data from these shards in parallel.

Problem

Although the v10 schema change introduces a shard factor, all shards must still be processed on a single querier. This is compounded by the fact that query ASTs are not broken down before execution. Therefore, while sharding lets us split up and process data at it's source location, we still re-aggregate it and process it all in one place. The two goals of this PR are to fix these by splitting up child ASTs into n merge-able queries for individual execution, where n = shard factor.

Details

Mapping ASTs

Firstly, we introduce a package called astmapper with the interface:

// ASTMapper is the exported interface for mapping between multiple AST representations
type ASTMapper interface {
	Map(node promql.Node) (promql.Node, error)
}

This interface is used to map ASTs into different ASTs. We use this to turn i.e.

sum by (foo) (rate(bar{baz=”blip”}[1m])) into approximately

sum by (foo) (
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
  ...
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
)

There are more complex optimizations to our sum mapping, but this is a simplified example for the PR description.

This works largely because summing the sub-sums is equivalent. The principle should hold true for any merge operation that is associative, but sums are the initial focus.

Configuration

querier.sum-shard (bool)

This flag enables shard summing behavior

Schema config on frontend

The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the config-yaml CLI flag). This is the same schema config the queriers consume. Queries are currently shardable when they only span a single shard-enabled schema definition (v10+) although this could be improved in later work.

Concurrency controls

It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes:

  • querier.max-outstanding-requests-per-tenant
  • querier.max-query-parallelism
  • querier.max-concurrent
  • server.grpc-max-concurrent-streams

It is also worth noting that instrumentation (traces) also scale with the number of sharded queries. We've had success scaling up our JAEGER_REPORTER_MAX_QUEUE_SIZE in order to handle the new volume.

Caveats

Work distribution

With query sharding enabled, the query-frontends do more work than previously. This runs afoul of the fact that they aren't horizontally scalable yet (per tenant queues are per process). Therefore, it's highly suggested to vertically scale them compared to before. In the future, this can be mitigated by the following:

  • making the query-frontends horizontally scalable (co-opt @pracucci's centralized limits or similar for the frontends)
  • Support recursive parallelization for queriers (detailed later).

parallelization vs ast delegation

In addition to parallelizing sum operations, this PR still distributes non-parallelized operations to downstream queriers. In the following example, we parallelize the sum(rate...)) and distribute the largest possible child ast (the sum(histogram_quantile...)) in this case) to a querier.

sum(histogram_quantile(0.5, rate(selector[1m]))) +
sum(rate(selector[1m]))

->
shard sum(rate(selector[1m])) and delegate sum(histogram_quantile(0.5, rate(selector[1m]))) to a querier.

Ideas for further improvements:

Parallelize merge operations

This PR only parallelizes sums, but it should be possible for any associative merge operation. Some examples could be:

  • avg(selector) -> sum(selector)/count(selector), which can be parallelized in turn by their shard factors
  • rate(selector[1m]) -> rate(selector{shard=0}[1m]) + rate(selector{shard=1}[1m])

pipe dream: recursive parallelization

Theoretically, queriers could perform a single aggregation and parallelize/recursively delegate to other queriers. It would likely require a planner/scheduler and could be extended to support intermediate caching. However, it is beyond the scope of this PR.

Benchmarks

Here are some benchmark results for different query types, shard factors, and artificial delay introduced (to simulate downstream executions):

go test -test.bench BenchmarkQuerySharding
goos: darwin
goarch: amd64
pkg: github.com/cortexproject/cortex/pkg/querier/queryrange
BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[1]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        2         668514448 ns/op
BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[2]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        3         345328972 ns/op
BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[4]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        6         173694813 ns/op

BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[1]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     1        1120235887 ns/op
BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[2]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     2         576303220 ns/op
BenchmarkQuerySharding/desc:[sum_nogroup]---shards:[4]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     4         289560953 ns/op

--------------------------------

BenchmarkQuerySharding/desc:[sum_by]---shards:[1]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                             2         807234470 ns/op
BenchmarkQuerySharding/desc:[sum_by]---shards:[2]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                             3         398730618 ns/op
BenchmarkQuerySharding/desc:[sum_by]---shards:[4]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                             5         202423816 ns/op

BenchmarkQuerySharding/desc:[sum_by]---shards:[1]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                          1        1250864781 ns/op
BenchmarkQuerySharding/desc:[sum_by]---shards:[2]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                          2         624703704 ns/op
BenchmarkQuerySharding/desc:[sum_by]---shards:[4]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                          4         324303066 ns/op

--------------------------------

BenchmarkQuerySharding/desc:[sum_without]---shards:[1]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        2         893648764 ns/op
BenchmarkQuerySharding/desc:[sum_without]---shards:[2]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        2         777097324 ns/op
BenchmarkQuerySharding/desc:[sum_without]---shards:[4]---series:[4096]---delayPerSeries:[0s]---samplesPerSeries:[100]-8                        2         627139368 ns/op

BenchmarkQuerySharding/desc:[sum_without]---shards:[1]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     1        1367469450 ns/op
BenchmarkQuerySharding/desc:[sum_without]---shards:[2]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     1        1012079071 ns/op
BenchmarkQuerySharding/desc:[sum_without]---shards:[4]---series:[4096]---delayPerSeries:[100µs]---samplesPerSeries:[100]-8                     2         791027808 ns/op

--------------------------------

PASS

Tooling:

I've also added a little tool, query-audit, which can run a series of queries against multiple prometheus backends and compare the results (I can add make targets if requested). I've used this to empirically test differences in query results across sharded and non-shard enabled backends. Output is below.

$ go install ./tools/query-audit/ && query-audit -f ~/grafana/tmp/equivalence-config.yaml

0.000000% avg diff for:
        query: sum(rate(container_cpu_usage_seconds_total[5m]))
        series: 1
        samples: 289
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-28 00:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)
        series: 95
        samples: 25877
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-28 00:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)
        series: 4308
        samples: 374989
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-26 00:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))
        series: 13
        samples: 325
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-25 06:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))
        series: 21
        samples: 525
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-25 06:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])
        series: 942
        samples: 23550
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-25 06:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum by (namespace) (predict_linear(container_cpu_usage_seconds_total[5m], 10))
        series: 16
        samples: 400
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-25 06:00:00 +0000 UTC
        step: 15m0s

0.000000% avg diff for:
        query: sum by (namespace) (avg_over_time((rate(container_cpu_usage_seconds_total[5m]))[10m:]) > 1)
        series: 4
        samples: 52
        start: 2019-11-25 00:00:00 +0000 UTC
        end: 2019-11-25 01:00:00 +0000 UTC
        step: 5m0s

@owen-d owen-d mentioned this pull request Dec 4, 2019
5 tasks
Copy link
Contributor

@rfratto rfratto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really exciting, @owen-d! I've only had time to do a really quick pass with only a few questions. I still have review the astmapper as a whole which I'll do in a second pass soon.

pkg/chunk/schema_config.go Outdated Show resolved Hide resolved
pkg/chunk/series_store.go Show resolved Hide resolved
pkg/cortex/modules.go Outdated Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from 66b3dd8 to 716bf53 Compare January 13, 2020 22:41
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch 3 times, most recently from 0d64811 to 1cd3224 Compare January 17, 2020 13:59
Copy link
Contributor

@cyriltovena cyriltovena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, found some nits, but overall I think this is pretty much ready.

Once you've went through all feedbacks, I think we should get @pracucci to give a review.

This is a very big PR to review, and even when you know about the whole idea.

@cyriltovena
Copy link
Contributor

We might also want to add some metrics instrumentations to know how many requests has been // by shard sums, or even tag an existing metrics to compare shard vs non shard in the past, up to you.

@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from 1cd3224 to 1a3be40 Compare January 21, 2020 18:35
Copy link
Contributor

@jtlisi jtlisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good. The AST package seems to have bulk of the logic and looks solid. I have a few nits and questions.

Also was there a reason to move so many files into separate packages? In general, I agree the code base could use some refactoring. However, it makes it difficult to review the PR when there are so many non-related changes.

pkg/ingester/ingester.go Outdated Show resolved Hide resolved
pkg/chunk/chunk_store_utils.go Outdated Show resolved Hide resolved
pkg/chunk/schema.go Outdated Show resolved Hide resolved
pkg/chunk/schema_util.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
pkg/querier/queryrange/series.go Outdated Show resolved Hide resolved
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from 1c8be61 to 269b387 Compare January 28, 2020 20:55
@jtlisi
Copy link
Contributor

jtlisi commented Jan 30, 2020

@owen-d can you rebase this on to master? The vendored version of prometheus recently changed and I think some variable names you use in astmapper need to be updated.

@owen-d owen-d force-pushed the feature/query-sharding-squashed branch 3 times, most recently from 9360cea to bd1e33b Compare February 3, 2020 17:49
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from fb6f97e to 4c6f40b Compare February 10, 2020 22:34
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This work is amazing @owen-d! I particularly appreciated the very detailed PR description and the extensive tests coverage. The implementation is also very clear, despite the intrinsic complexy 👏

I know you've heard this request a bunch of times, but I would be glad if you could rebase master. In the last few days we've merged new integration tests and goimports changes. I would like to make sure CI pass in the last of the recent changes too.

The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the config-yaml CLI flag).

What's the impact of this requirement on projects using the query-frontend standalone (ie. in front of Thanos)?

CHANGELOG.md Outdated Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
docs/configuration/arguments.md Show resolved Hide resolved
docs/configuration/arguments.md Show resolved Hide resolved
docs/configuration/arguments.md Outdated Show resolved Hide resolved
pkg/querier/queryrange/querysharding.go Outdated Show resolved Hide resolved
pkg/querier/queryrange/querysharding.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/parallel.go Outdated Show resolved Hide resolved
…rding-squashed

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
@owen-d
Copy link
Contributor Author

owen-d commented Feb 20, 2020

Hey @pracucci, thanks for the kind words and the review. I've addressed all but one of your comments which I've pushed back on.

Regarding

The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the config-yaml CLI flag).

This is only required if one wishes to use the new optimizations. If no config file is specified, the configuration is loaded via flags, which by default adds a v1 schema.

However, if a config file is specified, but it doesn't have any period configs, loading the frontend will now error. Should this case be handled differently or is this the right logic?

Edit: I was mistaken. All looks good wrt config loading.

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from b94c215 to 10dd597 Compare February 20, 2020 00:34
@pracucci
Copy link
Contributor

However, if a config file is specified, but it doesn't have any period configs, loading the frontend will now error. Should this case be handled differently or is this the right logic?

This would be a problem both because of 3rd parties using the query-frontend in front of non-Cortex instances (ie. Thanos) and because of the new blocks storage (you can run the query-frontend in front of Cortex with the blocks storage even if the sum shard won't be supported).

However, I can't find a way to reproduce it and even looking at the code it's not clear to me which specific change has introduced this "breaking change". Could you share more details, please?

Copy link
Contributor

@gouthamve gouthamve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally managed to put the time to review this. Sorry for the delay, this PR is really good! I mostly only have stylistic changes. You've been doing a bunch of those, but I hope the ones I have are the final ones.

Regarding the configs breaking change, I too couldn't find it. cfg.Schema.Load() will return nil if there are no configs :)

pkg/chunk/schema.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/astmapper.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/astmapper.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/parallel.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/subtree_folder.go Outdated Show resolved Hide resolved
pkg/querier/queryrange/queryable.go Outdated Show resolved Hide resolved
pkg/querier/queryrange/queryable.go Outdated Show resolved Hide resolved
pkg/querier/queryrange/value.go Outdated Show resolved Hide resolved
pkg/querier/astmapper/embedded.go Outdated Show resolved Hide resolved
@pracucci
Copy link
Contributor

Regarding the configs breaking change, I too couldn't find it. cfg.Schema.Load() will return nil if there are no configs :)

I've enhanced the query-frontend tests in #2167 (pending review) and I've also locally merged the PR #2167 into this PR and re-run integration tests: all green 💚.

Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given I believe there's no breaking change in the config file and the rest of changes LGTM, I'm happy to announce it's approved from my side 🎉 Again, good job!

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
@owen-d owen-d force-pushed the feature/query-sharding-squashed branch from d6253a9 to 65718db Compare February 20, 2020 16:22
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on the doc! I left few stylistic comments.

docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
docs/operations/query-auditor.md Outdated Show resolved Hide resolved
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed latest changes and LGTM! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants