Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket aggregation circuit breaker optimization. #46751

Merged
merged 8 commits into from
Jan 31, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.MAX_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
MultiBucketConsumerService.CHECK_BUCKETS_STEP_SIZE_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Node.WRITE_PORTS_FILE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ long currentMemoryUsage() {
}
}

public long getParentLimit() {
return this.parentSettings.getLimit();
}

/**
* Checks whether the parent breaker has been tripped
*/
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ protected Node(

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
responseCollectorService, circuitBreakerService);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
Expand Down Expand Up @@ -991,9 +991,10 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
*/
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
return new SearchService(clusterService, indicesService, threadPool,
scriptService, bigArrays, fetchPhase, responseCollectorService);
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.script.FieldScript;
Expand Down Expand Up @@ -194,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand All @@ -204,7 +205,7 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreakerService);

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;

Expand All @@ -38,14 +40,27 @@
*/
public class MultiBucketConsumerService {
public static final int DEFAULT_MAX_BUCKETS = 10000;
public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 1000;
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile int maxBucket;
public static final Setting<Integer> CHECK_BUCKETS_STEP_SIZE_SETTING =
Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE,
-1, Setting.Property.NodeScope, Setting.Property.Dynamic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of not making it configurable at all and check every 1000 buckets all the time? (Or maybe 1024 so that the % 1000 can be replaced with a lighter & 0x3FF mask)


private final CircuitBreakerService circuitBreakerService;

public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
private volatile int maxBucket;
private volatile int checkBucketsStepSize;

public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
this.checkBucketsStepSize = CHECK_BUCKETS_STEP_SIZE_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECK_BUCKETS_STEP_SIZE_SETTING, value -> {
checkBucketsStepSize = value;
});
}

private void setMaxBucket(int maxBucket) {
Expand Down Expand Up @@ -94,11 +109,16 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final int checkBucketsStepSizeLimit;
private final CircuitBreakerService circuitBreakerService;

// aggregations execute in a single thread so no atomic here
private int count;

public MultiBucketConsumer(int limit) {
public MultiBucketConsumer(int limit, int checkBucketsStepSizeLimit, CircuitBreakerService circuitBreakerService) {
this.limit = limit;
this.checkBucketsStepSizeLimit = checkBucketsStepSizeLimit;
this.circuitBreakerService = circuitBreakerService;
}

@Override
Expand All @@ -109,6 +129,11 @@ public void accept(int value) {
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}

if (value > 0 && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) {
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of only getting the request circuit breaker here, can you take a CircuitBreaker instead of a CircuitBreakerService in the constructor?

breaker.addEstimateBytesAndMaybeBreak(0, "check_allocation_buckets");
}
}

public void reset() {
Expand All @@ -125,6 +150,6 @@ public int getLimit() {
}

public MultiBucketConsumer create() {
return new MultiBucketConsumer(maxBucket);
return new MultiBucketConsumer(maxBucket, checkBucketsStepSize, this.circuitBreakerService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -293,6 +294,31 @@ public void testTrippedCircuitBreakerDurability() {
}
}

public void testAllocationBucketsBreaker() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "20mb")
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "true")
.build();

try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {

long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
assertEquals(new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(), parentLimitBytes);

MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer =
new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service);

long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage();
if (currentMemory > parentLimitBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe make the test a bit more predictable by calling addWithoutBreaking with a number of bytes that is greater than the limit?

CircuitBreakingException exception =
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(10000));
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [check_allocation_buckets] would be"));
assertThat(exception.getMessage(), containsString("which is larger than the limit of [20971520/20mb]"));
}
}
}

private long mb(long size) {
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
Expand Down
38 changes: 20 additions & 18 deletions test/framework/src/main/java/org/elasticsearch/node/MockNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
/**
* A node for testing which allows:
* <ul>
* <li>Overriding Version.CURRENT</li>
* <li>Adding test plugins that exist on the classpath</li>
* <li>Overriding Version.CURRENT</li>
* <li>Adding test plugins that exist on the classpath</li>
* </ul>
*/
public class MockNode extends Node {
Expand All @@ -72,27 +72,27 @@ public MockNode(final Settings settings, final Collection<Class<? extends Plugin
}

public MockNode(
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
this(settings, classpathPlugins, null, forbidPrivateIndexSettings);
}

public MockNode(
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final Path configPath,
final boolean forbidPrivateIndexSettings) {
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final Path configPath,
final boolean forbidPrivateIndexSettings) {
this(
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
classpathPlugins,
forbidPrivateIndexSettings);
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
classpathPlugins,
forbidPrivateIndexSettings);
}

private MockNode(
final Environment environment,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
final Environment environment,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you undo the above unrelated indentation changes?

super(environment, classpathPlugins, forbidPrivateIndexSettings);
this.classpathPlugins = classpathPlugins;
}
Expand Down Expand Up @@ -124,12 +124,14 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
@Override
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
responseCollectorService);
responseCollectorService, circuitBreakerService);
}
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, fetchPhase, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -67,9 +68,9 @@ static void removeActiveContext(SearchContext context) {
}

public MockSearchService(ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService);
}

@Override
Expand Down
Loading