From 468cf11006c94e35839f86f25bd799e034b46f1d Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 16 Sep 2019 17:09:13 +0800 Subject: [PATCH 1/4] bucket aggregation circuit breaker optimization --- .../common/settings/ClusterSettings.java | 1 + .../HierarchyCircuitBreakerService.java | 4 ++ .../java/org/elasticsearch/node/Node.java | 7 +-- .../elasticsearch/search/SearchService.java | 5 ++- .../MultiBucketConsumerService.java | 37 +++++++++++++--- .../HierarchyCircuitBreakerServiceTests.java | 26 +++++++++++ .../snapshots/SnapshotResiliencyTests.java | 2 +- .../java/org/elasticsearch/node/MockNode.java | 38 ++++++++-------- .../search/MockSearchService.java | 7 +-- .../aggregations/AggregatorTestCase.java | 44 +++++++++++-------- .../test/InternalAggregationTestCase.java | 7 ++- 11 files changed, 124 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 92332225ed5c8..b118a0f181b0a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, MultiBucketConsumerService.MAX_BUCKET_SETTING, + MultiBucketConsumerService.CHECK_BUCKETS_STEP_SIZE_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, Node.WRITE_PORTS_FILE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 5797843161c59..f752c714d032c 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -302,6 +302,10 @@ long currentMemoryUsage() { } } + public long getParentLimit() { + return this.parentSettings.getLimit(); + } + /** * Checks whether the parent breaker has been tripped */ diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 50cb468b48744..570430b68facb 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -514,7 +514,7 @@ protected Node( final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), - responseCollectorService); + responseCollectorService, circuitBreakerService); final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() @@ -991,9 +991,10 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) { */ protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, + CircuitBreakerService circuitBreakerService) { return new SearchService(clusterService, indicesService, threadPool, - scriptService, bigArrays, fetchPhase, responseCollectorService); + scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService); } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 7f2f57801d203..0e51cb4cb3164 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -58,6 +58,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.script.FieldScript; @@ -194,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public SearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase, - ResponseCollectorService responseCollectorService) { + ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; this.clusterService = clusterService; @@ -204,7 +205,7 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic this.bigArrays = bigArrays; this.queryPhase = new QueryPhase(); this.fetchPhase = fetchPhase; - this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings); + this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreakerService); TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index b302c40c3bd12..8565cf9451844 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -38,14 +40,27 @@ */ public class MultiBucketConsumerService { public static final int DEFAULT_MAX_BUCKETS = 10000; + public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 10000; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); - private volatile int maxBucket; + public static final Setting CHECK_BUCKETS_STEP_SIZE_SETTING = + Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE, + -1, Setting.Property.NodeScope, Setting.Property.Dynamic); + + private final CircuitBreakerService circuitBreakerService; - public MultiBucketConsumerService(ClusterService clusterService, Settings settings) { - this.maxBucket = MAX_BUCKET_SETTING.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); + private volatile int maxBucket; + private volatile int checkBucketsStepSize; + + public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) { + this.circuitBreakerService = circuitBreakerService; + this.maxBucket = MAX_BUCKET_SETTING.get(settings); + this.checkBucketsStepSize = CHECK_BUCKETS_STEP_SIZE_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECK_BUCKETS_STEP_SIZE_SETTING, value -> { + checkBucketsStepSize = value; + }); } private void setMaxBucket(int maxBucket) { @@ -94,11 +109,16 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws */ public static class MultiBucketConsumer implements IntConsumer { private final int limit; + private final int checkBucketsStepSizeLimit; + private final CircuitBreakerService circuitBreakerService; + // aggregations execute in a single thread so no atomic here private int count; - public MultiBucketConsumer(int limit) { + public MultiBucketConsumer(int limit, int checkBucketsStepSizeLimit, CircuitBreakerService circuitBreakerService) { this.limit = limit; + this.checkBucketsStepSizeLimit = checkBucketsStepSizeLimit; + this.circuitBreakerService = circuitBreakerService; } @Override @@ -109,6 +129,11 @@ public void accept(int value) { + "] but was [" + count + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); } + + if (value > 0 && this.circuitBreakerService instanceof HierarchyCircuitBreakerService + && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) { + ((HierarchyCircuitBreakerService) this.circuitBreakerService).checkParentLimit(0, "check_allocation_buckets"); + } } public void reset() { @@ -125,6 +150,6 @@ public int getLimit() { } public MultiBucketConsumer create() { - return new MultiBucketConsumer(maxBucket); + return new MultiBucketConsumer(maxBucket, checkBucketsStepSize, this.circuitBreakerService); } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index b72d8b9800934..73e158334b5d6 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.test.ESTestCase; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,6 +294,31 @@ public void testTrippedCircuitBreakerDurability() { } } + public void testAllocationBucketsBreaker() throws Exception { + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "20mb") + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "true") + .build(); + + try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + + long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit(); + assertEquals(new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(), parentLimitBytes); + + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service); + + long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage(); + if (currentMemory > parentLimitBytes) { + CircuitBreakingException exception = + expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(10000)); + assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [check_allocation_buckets] would be")); + assertThat(exception.getMessage(), containsString("which is larger than the limit of [20971520/20mb]")); + } + } + } + private long mb(long size) { return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes(); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e7e65f54bb711..33fdf9e461209 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1071,7 +1071,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService, - bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService); + bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService()); actions.put(SearchAction.INSTANCE, new TransportSearchAction(threadPool, transportService, searchService, searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService, diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index aa457978d83a5..aaaa2b6c38086 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -59,8 +59,8 @@ /** * A node for testing which allows: *
    - *
  • Overriding Version.CURRENT
  • - *
  • Adding test plugins that exist on the classpath
  • + *
  • Overriding Version.CURRENT
  • + *
  • Adding test plugins that exist on the classpath
  • *
*/ public class MockNode extends Node { @@ -72,27 +72,27 @@ public MockNode(final Settings settings, final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings) { + final Settings settings, + final Collection> classpathPlugins, + final boolean forbidPrivateIndexSettings) { this(settings, classpathPlugins, null, forbidPrivateIndexSettings); } public MockNode( - final Settings settings, - final Collection> classpathPlugins, - final Path configPath, - final boolean forbidPrivateIndexSettings) { + final Settings settings, + final Collection> classpathPlugins, + final Path configPath, + final boolean forbidPrivateIndexSettings) { this( - InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"), - classpathPlugins, - forbidPrivateIndexSettings); + InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"), + classpathPlugins, + forbidPrivateIndexSettings); } private MockNode( - final Environment environment, - final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings) { + final Environment environment, + final Collection> classpathPlugins, + final boolean forbidPrivateIndexSettings) { super(environment, classpathPlugins, forbidPrivateIndexSettings); this.classpathPlugins = classpathPlugins; } @@ -124,12 +124,14 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) { @Override protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, + CircuitBreakerService circuitBreakerService) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, - responseCollectorService); + responseCollectorService, circuitBreakerService); } - return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + return new MockSearchService(clusterService, indicesService, threadPool, scriptService, + bigArrays, fetchPhase, circuitBreakerService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index b9d9ff3cfc9bb..e26382600ab10 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.MockNode; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -67,9 +68,9 @@ static void removeActiveContext(SearchContext context) { } public MockSearchService(ClusterService clusterService, - IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, - BigArrays bigArrays, FetchPhase fetchPhase) { - super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null); + IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, + BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) { + super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 087b452232561..889bbdab05e31 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -90,6 +90,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; +import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_CHECK_BUCKETS_STEP_SIZE; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -131,7 +132,7 @@ protected A createAggregator(AggregationBuilder aggregati IndexSearcher indexSearcher, MappedFieldType... fieldTypes) throws IOException { return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()), fieldTypes); } protected A createAggregator(Query query, @@ -140,7 +141,7 @@ protected A createAggregator(Query query, IndexSettings indexSettings, MappedFieldType... fieldTypes) throws IOException { return createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()), fieldTypes); } protected A createAggregator(Query query, AggregationBuilder aggregationBuilder, @@ -195,7 +196,7 @@ public boolean shouldCache(Query query) { when(searchContext.numberOfShards()).thenReturn(1); when(searchContext.searcher()).thenReturn(contextIndexSearcher); when(searchContext.fetchPhase()) - .thenReturn(new FetchPhase(Arrays.asList(new FetchSourceSubPhase(), new DocValueFieldsFetchSubPhase()))); + .thenReturn(new FetchPhase(Arrays.asList(new FetchSourceSubPhase(), new DocValueFieldsFetchSubPhase()))); when(searchContext.bitsetFilterCache()).thenReturn(new BitsetFilterCache(indexSettings, mock(Listener.class))); CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); IndexShard indexShard = mock(IndexShard.class); @@ -254,12 +255,12 @@ public boolean shouldCache(Query query) { protected IndexSettings createIndexSettings() { return new IndexSettings( - IndexMetaData.builder("_index").settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .creationDate(System.currentTimeMillis()) - .build(), - Settings.EMPTY + IndexMetaData.builder("_index").settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY ); } @@ -274,9 +275,9 @@ protected MapperService mapperServiceMock() { * sub-tests that need a more complex mock can overwrite this */ protected QueryShardContext queryShardContextMock(IndexSearcher searcher, - MapperService mapperService, - IndexSettings indexSettings, - CircuitBreakerService circuitBreakerService) { + MapperService mapperService, + IndexSettings indexSettings, + CircuitBreakerService circuitBreakerService) { return new QueryShardContext(0, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, null, getIndexFieldDataLookup(mapperService, circuitBreakerService), @@ -314,7 +315,8 @@ protected A search(IndexSe AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); a.preCollection(); searcher.search(query, a); @@ -352,20 +354,22 @@ protected A searchAndReduc final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; final int size = compCTX.leaves().size(); subSearchers = new ShardSearcher[size]; - for(int searcherIDX=0;searcherIDX aggs = new ArrayList<> (); + List aggs = new ArrayList<>(); Query rewritten = searcher.rewrite(query); Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); for (ShardSearcher subSearcher : subSearchers) { - MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes); a.preCollection(); subSearcher.search(weight, a); @@ -383,7 +387,8 @@ protected A searchAndReduc Collections.shuffle(aggs, random()); int r = randomIntBetween(1, toReduceSize); List toReduce = aggs.subList(0, r); - MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, false); @@ -393,7 +398,8 @@ protected A searchAndReduc aggs.add(reduced); } // now do the final reduce - MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 2b7704964c86c..50c898ba1aad2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -159,6 +159,7 @@ public abstract class InternalAggregationTestCase extends AbstractWireSerializingTestCase { public static final int DEFAULT_MAX_BUCKETS = 100000; + public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 100000; protected static final double TOLERANCE = 1e-10; private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { @@ -269,7 +270,8 @@ public void testReduceRandom() { Collections.shuffle(toReduce, random()); int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); @SuppressWarnings("unchecked") @@ -285,7 +287,8 @@ public void testReduceRandom() { toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce.add(reduced); } - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true); @SuppressWarnings("unchecked") From b221d757b53c1e89d03c195e9f073074347bc986 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 25 Sep 2019 11:41:32 +0800 Subject: [PATCH 2/4] use request circuit breaker for bucket memory checking --- .../aggregations/MultiBucketConsumerService.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index 8565cf9451844..b41ef0ba7b1bb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -19,13 +19,13 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -40,7 +40,7 @@ */ public class MultiBucketConsumerService { public static final int DEFAULT_MAX_BUCKETS = 10000; - public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 10000; + public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 1000; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -130,9 +130,9 @@ public void accept(int value) { MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); } - if (value > 0 && this.circuitBreakerService instanceof HierarchyCircuitBreakerService - && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) { - ((HierarchyCircuitBreakerService) this.circuitBreakerService).checkParentLimit(0, "check_allocation_buckets"); + if (value > 0 && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) { + CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); + breaker.addEstimateBytesAndMaybeBreak(0, "check_allocation_buckets"); } } From 0df4cf4dac282aa330dab9580a07e704ecc3d93c Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 28 Oct 2019 20:49:20 +0800 Subject: [PATCH 3/4] remove check allocation buckets setting, enhanced UT --- .../common/settings/ClusterSettings.java | 1 - .../MultiBucketConsumerService.java | 20 ++++--------- .../HierarchyCircuitBreakerServiceTests.java | 25 ++++++++-------- .../java/org/elasticsearch/node/MockNode.java | 30 +++++++++---------- .../search/MockSearchService.java | 4 +-- .../aggregations/AggregatorTestCase.java | 27 +++++++++-------- .../test/InternalAggregationTestCase.java | 5 ++-- 7 files changed, 53 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 303beb17b88ff..81429e011f49c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -376,7 +376,6 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, MultiBucketConsumerService.MAX_BUCKET_SETTING, - MultiBucketConsumerService.CHECK_BUCKETS_STEP_SIZE_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, Node.WRITE_PORTS_FILE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index b41ef0ba7b1bb..eb5ae9fd1cc7d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -51,16 +51,11 @@ public class MultiBucketConsumerService { private final CircuitBreakerService circuitBreakerService; private volatile int maxBucket; - private volatile int checkBucketsStepSize; public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) { this.circuitBreakerService = circuitBreakerService; this.maxBucket = MAX_BUCKET_SETTING.get(settings); - this.checkBucketsStepSize = CHECK_BUCKETS_STEP_SIZE_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); - clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECK_BUCKETS_STEP_SIZE_SETTING, value -> { - checkBucketsStepSize = value; - }); } private void setMaxBucket(int maxBucket) { @@ -109,16 +104,14 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws */ public static class MultiBucketConsumer implements IntConsumer { private final int limit; - private final int checkBucketsStepSizeLimit; - private final CircuitBreakerService circuitBreakerService; + private final CircuitBreaker breaker; // aggregations execute in a single thread so no atomic here private int count; - public MultiBucketConsumer(int limit, int checkBucketsStepSizeLimit, CircuitBreakerService circuitBreakerService) { + public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; - this.checkBucketsStepSizeLimit = checkBucketsStepSizeLimit; - this.circuitBreakerService = circuitBreakerService; + this.breaker = breaker; } @Override @@ -130,9 +123,8 @@ public void accept(int value) { MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); } - if (value > 0 && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) { - CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); - breaker.addEstimateBytesAndMaybeBreak(0, "check_allocation_buckets"); + if (value > 0 && (count & 0x3FF) == 0) { + breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } } @@ -150,6 +142,6 @@ public int getLimit() { } public MultiBucketConsumer create() { - return new MultiBucketConsumer(maxBucket, checkBucketsStepSize, this.circuitBreakerService); + return new MultiBucketConsumer(maxBucket, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 73e158334b5d6..68258d02339ac 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -296,26 +296,27 @@ public void testTrippedCircuitBreakerDurability() { public void testAllocationBucketsBreaker() throws Exception { Settings clusterSettings = Settings.builder() - .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "20mb") - .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "true") + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b") + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false") .build(); try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit(); - assertEquals(new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(), parentLimitBytes); + assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes); + CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST); MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = - new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service); - - long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage(); - if (currentMemory > parentLimitBytes) { - CircuitBreakingException exception = - expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(10000)); - assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [check_allocation_buckets] would be")); - assertThat(exception.getMessage(), containsString("which is larger than the limit of [20971520/20mb]")); - } + new MultiBucketConsumerService.MultiBucketConsumer(10000, breaker); + + // make sure used bytes is greater than the total circuit breaker limit + breaker.addWithoutBreaking(200); + + CircuitBreakingException exception = + expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024)); + assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be")); + assertThat(exception.getMessage(), containsString("which is larger than the limit of [100/100b]")); } } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index aaaa2b6c38086..5f289b42d783e 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -59,8 +59,8 @@ /** * A node for testing which allows: *
    - *
  • Overriding Version.CURRENT
  • - *
  • Adding test plugins that exist on the classpath
  • + *
  • Overriding Version.CURRENT
  • + *
  • Adding test plugins that exist on the classpath
  • *
*/ public class MockNode extends Node { @@ -72,27 +72,27 @@ public MockNode(final Settings settings, final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings) { + final Settings settings, + final Collection> classpathPlugins, + final boolean forbidPrivateIndexSettings) { this(settings, classpathPlugins, null, forbidPrivateIndexSettings); } public MockNode( - final Settings settings, - final Collection> classpathPlugins, - final Path configPath, - final boolean forbidPrivateIndexSettings) { + final Settings settings, + final Collection> classpathPlugins, + final Path configPath, + final boolean forbidPrivateIndexSettings) { this( - InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"), - classpathPlugins, - forbidPrivateIndexSettings); + InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"), + classpathPlugins, + forbidPrivateIndexSettings); } private MockNode( - final Environment environment, - final Collection> classpathPlugins, - final boolean forbidPrivateIndexSettings) { + final Environment environment, + final Collection> classpathPlugins, + final boolean forbidPrivateIndexSettings) { super(environment, classpathPlugins, forbidPrivateIndexSettings); this.classpathPlugins = classpathPlugins; } diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 12bb54d7b85e5..9653f3b66a515 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -68,8 +68,8 @@ static void removeActiveContext(SearchContext context) { } public MockSearchService(ClusterService clusterService, - IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, - BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) { + IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, + BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) { super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService); } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index dceec5cbe3df1..f7a2b5c6e0848 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.Weight; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -132,7 +133,7 @@ protected
A createAggregator(AggregationBuilder aggregati IndexSearcher indexSearcher, MappedFieldType... fieldTypes) throws IOException { return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes); } protected A createAggregator(Query query, @@ -141,7 +142,7 @@ protected A createAggregator(Query query, IndexSettings indexSettings, MappedFieldType... fieldTypes) throws IOException { return createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes); } protected A createAggregator(Query query, AggregationBuilder aggregationBuilder, @@ -255,12 +256,12 @@ public boolean shouldCache(Query query) { protected IndexSettings createIndexSettings() { return new IndexSettings( - IndexMetaData.builder("_index").settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .creationDate(System.currentTimeMillis()) - .build(), - Settings.EMPTY + IndexMetaData.builder("_index").settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY ); } @@ -316,7 +317,7 @@ protected A search(IndexSe int maxBucket, MappedFieldType... fieldTypes) throws IOException { MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); a.preCollection(); searcher.search(query, a); @@ -364,12 +365,12 @@ protected A searchAndReduc Query rewritten = searcher.rewrite(query); Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); for (ShardSearcher subSearcher : subSearchers) { MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes); a.preCollection(); subSearcher.search(weight, a); @@ -388,7 +389,7 @@ protected A searchAndReduc int r = randomIntBetween(1, toReduceSize); List toReduce = aggs.subList(0, r); MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, false); @@ -399,7 +400,7 @@ protected A searchAndReduc } // now do the final reduce MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 50c898ba1aad2..7fc375e15a67f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; @@ -271,7 +272,7 @@ public void testReduceRandom() { int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); @SuppressWarnings("unchecked") @@ -288,7 +289,7 @@ public void testReduceRandom() { toReduce.add(reduced); } MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, - DEFAULT_CHECK_BUCKETS_STEP_SIZE, new NoneCircuitBreakerService()); + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true); @SuppressWarnings("unchecked") From 998ff15218ccd2123590e85f6ddd222b3cdd9f9e Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 28 Oct 2019 21:11:22 +0800 Subject: [PATCH 4/4] remove unsued variable --- .../org/elasticsearch/search/SearchService.java | 4 +++- .../aggregations/MultiBucketConsumerService.java | 15 +++++---------- .../search/aggregations/AggregatorTestCase.java | 7 +++---- .../test/InternalAggregationTestCase.java | 1 - 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 2567483464332..b578c325b595b 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -205,7 +206,8 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic this.bigArrays = bigArrays; this.queryPhase = new QueryPhase(); this.fetchPhase = fetchPhase; - this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreakerService); + this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, + circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index eb5ae9fd1cc7d..1fede42b694d7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -40,20 +39,15 @@ */ public class MultiBucketConsumerService { public static final int DEFAULT_MAX_BUCKETS = 10000; - public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 1000; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting CHECK_BUCKETS_STEP_SIZE_SETTING = - Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE, - -1, Setting.Property.NodeScope, Setting.Property.Dynamic); - - private final CircuitBreakerService circuitBreakerService; + private final CircuitBreaker breaker; private volatile int maxBucket; - public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) { - this.circuitBreakerService = circuitBreakerService; + public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreaker breaker) { + this.breaker = breaker; this.maxBucket = MAX_BUCKET_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); } @@ -123,6 +117,7 @@ public void accept(int value) { MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); } + // check parent circuit breaker every 1024 buckets if (value > 0 && (count & 0x3FF) == 0) { breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } @@ -142,6 +137,6 @@ public int getLimit() { } public MultiBucketConsumer create() { - return new MultiBucketConsumer(maxBucket, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); + return new MultiBucketConsumer(maxBucket, breaker); } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index f7a2b5c6e0848..cf860a023bd4a 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -91,7 +91,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; -import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_CHECK_BUCKETS_STEP_SIZE; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -197,7 +196,7 @@ public boolean shouldCache(Query query) { when(searchContext.numberOfShards()).thenReturn(1); when(searchContext.searcher()).thenReturn(contextIndexSearcher); when(searchContext.fetchPhase()) - .thenReturn(new FetchPhase(Arrays.asList(new FetchSourceSubPhase(), new DocValueFieldsFetchSubPhase()))); + .thenReturn(new FetchPhase(Arrays.asList(new FetchSourceSubPhase(), new DocValueFieldsFetchSubPhase()))); when(searchContext.bitsetFilterCache()).thenReturn(new BitsetFilterCache(indexSettings, mock(Listener.class))); CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); IndexShard indexShard = mock(IndexShard.class); @@ -355,13 +354,13 @@ protected A searchAndReduc final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; final int size = compCTX.leaves().size(); subSearchers = new ShardSearcher[size]; - for (int searcherIDX = 0; searcherIDX < subSearchers.length; searcherIDX++) { + for(int searcherIDX=0;searcherIDX aggs = new ArrayList<>(); + List aggs = new ArrayList<> (); Query rewritten = searcher.rewrite(query); Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 7fc375e15a67f..3c6c2ea378b87 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -160,7 +160,6 @@ public abstract class InternalAggregationTestCase extends AbstractWireSerializingTestCase { public static final int DEFAULT_MAX_BUCKETS = 100000; - public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 100000; protected static final double TOLERANCE = 1e-10; private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {