Skip to content

Commit

Permalink
Lower contention on requests with many aggs (backport of #66895) (#66941
Browse files Browse the repository at this point in the history
)

This lowers the contention on the `REQUEST` circuit breaker when building
many aggregations on many threads by preallocating a chunk of breaker
up front. This cuts down on the number of times we enter the busy loop
in `ChildMemoryCircuitBreaker.limit`. Now we hit it one time when building
aggregations. We still hit the busy loop if we collect many buckets.

We let the `AggregationBuilder` pick size of the "chunk" that we
preallocate but it doesn't have much to go on - not even the field types.
But it is available in a convenient spot and the estimates don't have to
be particularly accurate.

The benchmarks on my 12 core desktop are interesting:
```
Benchmark         (breaker)  Mode  Cnt    Score    Error  Units
sum                    noop  avgt   10    1.672 ±  0.042  us/op
sum                    real  avgt   10    4.100 ±  0.027  us/op
sum             preallocate  avgt   10    4.230 ±  0.034  us/op
termsSixtySums         noop  avgt   10   92.658 ±  0.939  us/op
termsSixtySums         real  avgt   10  278.764 ± 39.751  us/op
termsSixtySums  preallocate  avgt   10  120.896 ± 16.097  us/op
termsSum               noop  avgt   10    4.573 ±  0.095  us/op
termsSum               real  avgt   10    9.932 ±  0.211  us/op
termsSum        preallocate  avgt   10    7.695 ±  0.313  us/op
```

They show pretty clearly that not using the circuit breaker at all is
faster. But we can't do that because we don't want to bring the node
down on bad aggs. When there are many aggs (termsSixtySums) the
preallocation claws back much of the performance. It even helps
marginally when there are two aggs (termsSum). For a single agg (sum)
we see a 130 nanosecond hit. Fine.

But these values are all pretty small. At best we're seeing a 160
microsecond savings. Not so on a 160 vCPU machine:

```
Benchmark         (breaker)  Mode  Cnt      Score       Error  Units
sum                    noop  avgt   10     44.956 ±     8.851  us/op
sum                    real  avgt   10    118.008 ±    19.505  us/op
sum             preallocate  avgt   10    241.234 ±   305.998  us/op
termsSixtySums         noop  avgt   10   1339.802 ±    51.410  us/op
termsSixtySums         real  avgt   10  12077.671 ± 12110.993  us/op
termsSixtySums  preallocate  avgt   10   3804.515 ±  1458.702  us/op
termsSum               noop  avgt   10     59.478 ±     2.261  us/op
termsSum               real  avgt   10    293.756 ±   253.854  us/op
termsSum        preallocate  avgt   10    197.963 ±    41.578  us/op
```

All of these numbers are larger because we're running all the CPUs
flat out and we're seeing more contention everywhere. Even the "noop"
breaker sees some contention, but I think it is mostly around memory
allocation. Anyway, with many many (termsSixtySums) aggs we're looking
at 8 milliseconds of savings by preallocating. Just by dodging the busy
loop as much as possible. The error in the measurements there are
substantial. Here are the runs:
```
real:
Iteration   1: 8679.417 ±(99.9%) 273.220 us/op
Iteration   2: 5849.538 ±(99.9%) 179.258 us/op
Iteration   3: 5953.935 ±(99.9%) 152.829 us/op
Iteration   4: 5763.465 ±(99.9%) 150.759 us/op
Iteration   5: 14157.592 ±(99.9%) 395.224 us/op
Iteration   1: 24857.020 ±(99.9%) 1133.847 us/op
Iteration   2: 24730.903 ±(99.9%) 1107.718 us/op
Iteration   3: 18894.383 ±(99.9%) 738.706 us/op
Iteration   4: 5493.965 ±(99.9%) 120.529 us/op
Iteration   5: 6396.493 ±(99.9%) 143.630 us/op
preallocate:
Iteration   1: 5512.590 ±(99.9%) 110.222 us/op
Iteration   2: 3087.771 ±(99.9%) 120.084 us/op
Iteration   3: 3544.282 ±(99.9%) 110.373 us/op
Iteration   4: 3477.228 ±(99.9%) 107.270 us/op
Iteration   5: 4351.820 ±(99.9%) 82.946 us/op
Iteration   1: 3185.250 ±(99.9%) 154.102 us/op
Iteration   2: 3058.000 ±(99.9%) 143.758 us/op
Iteration   3: 3199.920 ±(99.9%) 61.589 us/op
Iteration   4: 3163.735 ±(99.9%) 71.291 us/op
Iteration   5: 5464.556 ±(99.9%) 59.034 us/op
```

That variability from 5.5ms to 25ms is terrible. It makes me not
particularly trust the 8ms savings from the report. But still,
the preallocating method has much less variability between runs
and almost all the runs are faster than all of the non-preallocated
runs. Maybe the savings is more like 2 or 3 milliseconds, but still.
Or maybe we should think of hte savings as worst vs worst? If so its
19 milliseconds.

Anyway, its hard to measure how much this helps. But, certainly some.

Closes #58647
  • Loading branch information
nik9000 committed Jan 4, 2021
1 parent 081600d commit 4fd9b1d
Show file tree
Hide file tree
Showing 16 changed files with 905 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.benchmark.search.aggregations;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.Version;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.BucketedSort.ExtraData;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Benchmarks the overhead of constructing {@link Aggregator}s in many
* parallel threads. Machines with different numbers of cores will see
* wildly different results running this from running this with more
* cores seeing more benefits from preallocation.
*/
@Fork(2)
@Warmup(iterations = 10)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Threads(Threads.MAX)
public class AggConstructionContentionBenchmark {
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, org.elasticsearch.common.collect.List.of());
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY);
private final Index index = new Index("test", "uuid");
private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(
Settings.EMPTY,
new IndexFieldDataCache.Listener() {
}
);

private CircuitBreakerService breakerService;
private BigArrays bigArrays;
private boolean preallocateBreaker;

@Param({ "noop", "real", "preallocate" })
private String breaker;

@Setup
public void setup() {
switch (breaker) {
case "real":
breakerService = new HierarchyCircuitBreakerService(
Settings.EMPTY,
org.elasticsearch.common.collect.List.of(),
clusterSettings
);
break;
case "preallocate":
preallocateBreaker = true;
breakerService = new HierarchyCircuitBreakerService(
Settings.EMPTY,
org.elasticsearch.common.collect.List.of(),
clusterSettings
);
break;
case "noop":
breakerService = new NoneCircuitBreakerService();
break;
default:
throw new UnsupportedOperationException();
}
bigArrays = new BigArrays(recycler, breakerService, "request");
}

@Benchmark
public void sum() throws IOException {
buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1")));
}

@Benchmark
public void termsSum() throws IOException {
buildFactories(
new AggregatorFactories.Builder().addAggregator(
new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2"))
)
);
}

@Benchmark
public void termsSixtySums() throws IOException {
TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1");
for (int i = 0; i < 60; i++) {
b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i));
}
buildFactories(new AggregatorFactories.Builder().addAggregator(b));
}

private void buildFactories(AggregatorFactories.Builder factories) throws IOException {
try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) {
factories.build(context, null).createTopLevelAggregators();
}
}

private class DummyAggregationContext extends AggregationContext {
private final Query query = new MatchAllDocsQuery();
private final List<Releasable> releaseMe = new ArrayList<>();

private final CircuitBreaker breaker;
private final PreallocatedCircuitBreakerService preallocated;
private final MultiBucketConsumer multiBucketConsumer;

DummyAggregationContext(long bytesToPreallocate) {
CircuitBreakerService breakerService;
if (preallocateBreaker) {
breakerService = preallocated = new PreallocatedCircuitBreakerService(
AggConstructionContentionBenchmark.this.breakerService,
CircuitBreaker.REQUEST,
bytesToPreallocate,
"aggregations"
);
} else {
breakerService = AggConstructionContentionBenchmark.this.breakerService;
preallocated = null;
}
breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker);
}

@Override
public Query query() {
return query;
}

@Override
public Aggregator profileIfEnabled(Aggregator agg) throws IOException {
return agg;
}

@Override
public boolean profiling() {
return false;
}

@Override
public long nowInMillis() {
return 0;
}

@Override
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
}, index, ft.name());
return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService);
}

@Override
public MappedFieldType getFieldType(String path) {
if (path.startsWith("int")) {
return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER);
}
throw new UnsupportedOperationException();
}

@Override
public boolean isFieldMapped(String field) {
return field.startsWith("int");
}

@Override
public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryType> context) {
throw new UnsupportedOperationException();
}

@Override
public SearchLookup lookup() {
throw new UnsupportedOperationException();
}

@Override
public ValuesSourceRegistry getValuesSourceRegistry() {
return searchModule.getValuesSourceRegistry();
}

@Override
public BigArrays bigArrays() {
return bigArrays;
}

@Override
public IndexSearcher searcher() {
return null;
}

@Override
public Query buildQuery(QueryBuilder builder) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public IndexSettings getIndexSettings() {
throw new UnsupportedOperationException();
}

@Override
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sortBuilders) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public ObjectMapper getObjectMapper(String path) {
throw new UnsupportedOperationException();
}

@Override
public NestedScope nestedScope() {
throw new UnsupportedOperationException();
}

@Override
public SubSearchContext subSearchContext() {
throw new UnsupportedOperationException();
}

@Override
public void addReleasable(Aggregator aggregator) {
releaseMe.add(aggregator);
}

@Override
public MultiBucketConsumer multiBucketConsumer() {
return multiBucketConsumer;
}

@Override
public BitsetFilterCache bitsetFilterCache() {
throw new UnsupportedOperationException();
}

@Override
public BucketedSort buildBucketedSort(SortBuilder<?> sort, int size, ExtraData values) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int shardRandomSeed() {
return 0;
}

@Override
public long getRelativeTimeInMillis() {
return 0;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public CircuitBreaker breaker() {
return breaker;
}

@Override
public Analyzer getIndexAnalyzer(Function<String, NamedAnalyzer> unindexedFieldAnalyzer) {
throw new UnsupportedOperationException();
}

@Override
public boolean isCacheable() {
throw new UnsupportedOperationException();
}

@Override
public Version indexVersionCreated() {
return Version.CURRENT;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
releaseMe.add(preallocated);
Releasables.close(releaseMe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ public BucketCardinality bucketCardinality() {
public String getType() {
return "test";
}

@Override
public long bytesToPreallocate() {
return 0;
}
}

/**
Expand Down Expand Up @@ -570,13 +575,13 @@ public Aggregator subAggregator(String name) {
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[] {
new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap())
buildEmptyAggregation()
};
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap());
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
.addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
logger.info("--> got an expected exception", e);
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]"));

client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
Expand Down
Loading

0 comments on commit 4fd9b1d

Please sign in to comment.