Skip to content

Commit

Permalink
bucket aggregation circuit breaker optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
howardhuanghua committed Sep 17, 2019
1 parent de17140 commit 468cf11
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.MAX_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
MultiBucketConsumerService.CHECK_BUCKETS_STEP_SIZE_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Node.WRITE_PORTS_FILE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ long currentMemoryUsage() {
}
}

public long getParentLimit() {
return this.parentSettings.getLimit();
}

/**
* Checks whether the parent breaker has been tripped
*/
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ protected Node(

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
responseCollectorService, circuitBreakerService);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
Expand Down Expand Up @@ -991,9 +991,10 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
*/
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
return new SearchService(clusterService, indicesService, threadPool,
scriptService, bigArrays, fetchPhase, responseCollectorService);
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.script.FieldScript;
Expand Down Expand Up @@ -194,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand All @@ -204,7 +205,7 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreakerService);

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;

Expand All @@ -38,14 +40,27 @@
*/
public class MultiBucketConsumerService {
public static final int DEFAULT_MAX_BUCKETS = 10000;
public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 10000;
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile int maxBucket;
public static final Setting<Integer> CHECK_BUCKETS_STEP_SIZE_SETTING =
Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE,
-1, Setting.Property.NodeScope, Setting.Property.Dynamic);

private final CircuitBreakerService circuitBreakerService;

public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
private volatile int maxBucket;
private volatile int checkBucketsStepSize;

public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
this.checkBucketsStepSize = CHECK_BUCKETS_STEP_SIZE_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECK_BUCKETS_STEP_SIZE_SETTING, value -> {
checkBucketsStepSize = value;
});
}

private void setMaxBucket(int maxBucket) {
Expand Down Expand Up @@ -94,11 +109,16 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final int checkBucketsStepSizeLimit;
private final CircuitBreakerService circuitBreakerService;

// aggregations execute in a single thread so no atomic here
private int count;

public MultiBucketConsumer(int limit) {
public MultiBucketConsumer(int limit, int checkBucketsStepSizeLimit, CircuitBreakerService circuitBreakerService) {
this.limit = limit;
this.checkBucketsStepSizeLimit = checkBucketsStepSizeLimit;
this.circuitBreakerService = circuitBreakerService;
}

@Override
Expand All @@ -109,6 +129,11 @@ public void accept(int value) {
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}

if (value > 0 && this.circuitBreakerService instanceof HierarchyCircuitBreakerService
&& checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) {
((HierarchyCircuitBreakerService) this.circuitBreakerService).checkParentLimit(0, "check_allocation_buckets");
}
}

public void reset() {
Expand All @@ -125,6 +150,6 @@ public int getLimit() {
}

public MultiBucketConsumer create() {
return new MultiBucketConsumer(maxBucket);
return new MultiBucketConsumer(maxBucket, checkBucketsStepSize, this.circuitBreakerService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -293,6 +294,31 @@ public void testTrippedCircuitBreakerDurability() {
}
}

public void testAllocationBucketsBreaker() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "20mb")
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "true")
.build();

try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {

long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
assertEquals(new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(), parentLimitBytes);

MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer =
new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service);

long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage();
if (currentMemory > parentLimitBytes) {
CircuitBreakingException exception =
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(10000));
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [check_allocation_buckets] would be"));
assertThat(exception.getMessage(), containsString("which is larger than the limit of [20971520/20mb]"));
}
}
}

private long mb(long size) {
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
Expand Down
38 changes: 20 additions & 18 deletions test/framework/src/main/java/org/elasticsearch/node/MockNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
/**
* A node for testing which allows:
* <ul>
* <li>Overriding Version.CURRENT</li>
* <li>Adding test plugins that exist on the classpath</li>
* <li>Overriding Version.CURRENT</li>
* <li>Adding test plugins that exist on the classpath</li>
* </ul>
*/
public class MockNode extends Node {
Expand All @@ -72,27 +72,27 @@ public MockNode(final Settings settings, final Collection<Class<? extends Plugin
}

public MockNode(
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
this(settings, classpathPlugins, null, forbidPrivateIndexSettings);
}

public MockNode(
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final Path configPath,
final boolean forbidPrivateIndexSettings) {
final Settings settings,
final Collection<Class<? extends Plugin>> classpathPlugins,
final Path configPath,
final boolean forbidPrivateIndexSettings) {
this(
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
classpathPlugins,
forbidPrivateIndexSettings);
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
classpathPlugins,
forbidPrivateIndexSettings);
}

private MockNode(
final Environment environment,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
final Environment environment,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
super(environment, classpathPlugins, forbidPrivateIndexSettings);
this.classpathPlugins = classpathPlugins;
}
Expand Down Expand Up @@ -124,12 +124,14 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
@Override
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
responseCollectorService);
responseCollectorService, circuitBreakerService);
}
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, fetchPhase, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -67,9 +68,9 @@ static void removeActiveContext(SearchContext context) {
}

public MockSearchService(ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService);
}

@Override
Expand Down
Loading

0 comments on commit 468cf11

Please sign in to comment.