Skip to content

Commit

Permalink
Add HierarchyCircuitBreakerService
Browse files Browse the repository at this point in the history
Adds a breaker for request BigArrays, which are used for parent/child
queries as well as some aggregations. Certain operations like Netty HTTP
responses and transport responses increment the breaker, but will not
trip.

This also changes the output of the nodes' stats endpoint to show the
parent breaker as well as the fielddata and request breakers.

There are a number of new settings for breakers now:

`indices.breaker.total.limit`: starting limit for all memory-use breaker,
defaults to 70%

`indices.breaker.fielddata.limit`: starting limit for fielddata breaker,
defaults to 60%
`indices.breaker.fielddata.overhead`: overhead for fielddata breaker
estimations, defaults to 1.03

(the fielddata breaker settings also use the backwards-compatible
setting `indices.fielddata.breaker.limit` and
`indices.fielddata.breaker.overhead`)

`indices.breaker.request.limit`: starting limit for request breaker,
defaults to 40%
`indices.breaker.request.overhead`: request breaker estimation overhead,
defaults to 1.0

The breaker service infrastructure is now generic and opens the path to
adding additional circuit breakers in the future.

Fixes #6129

Conflicts:
	src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
	src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java
	src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java
	src/main/java/org/elasticsearch/index/fielddata/ordinals/InternalGlobalOrdinalsBuilder.java
	src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java
	src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java
	src/main/java/org/elasticsearch/node/internal/InternalNode.java
	src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java
	src/test/java/org/elasticsearch/index/codec/CodecTests.java
	src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java
	src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java
	src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java
	src/test/java/org/elasticsearch/index/query/IndexQueryParserFilterCachingTests.java
	src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java
	src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java
	src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java
	src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java
	src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java
  • Loading branch information
dakrone committed Jul 28, 2014
1 parent df62b9d commit 489b8dc
Show file tree
Hide file tree
Showing 59 changed files with 1,640 additions and 415 deletions.
61 changes: 48 additions & 13 deletions docs/reference/index-modules/fielddata.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,63 @@ field data after a certain time of inactivity. Defaults to `-1`. For
example, can be set to `5m` for a 5 minute expiry.
|=======================================================================

[float]
[[circuit-breaker]]
=== Circuit Breaker

coming[1.4.0,Prior to 1.4.0 there was only a single circuit breaker for fielddata]

Elasticsearch contains multiple circuit breakers used to prevent operations from
causing an OutOfMemoryError. Each breaker specifies a limit for how much memory
it can use. Additionally, there is a parent-level breaker that specifies the
total amount of memory that can be used across all breakers.

The parent-level breaker can be configured with the following setting:

`indices.breaker.total.limit`::
Starting limit for overall parent breaker, defaults to 70% of JVM heap

All circuit breaker settings can be changed dynamically using the cluster update
settings API.

[float]
[[fielddata-circuit-breaker]]
=== Field data circuit breaker
==== Field data circuit breaker
The field data circuit breaker allows Elasticsearch to estimate the amount of
memory a field will required to be loaded into memory. It can then prevent the
field data loading by raising an exception. By default the limit is configured
to 60% of the maximum JVM heap. It can be configured with the following
parameters:

[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|`indices.fielddata.breaker.limit` |Maximum size of estimated field data
to allow loading. Defaults to 60% of the maximum JVM heap.
|`indices.fielddata.breaker.overhead` |A constant that all field data
estimations are multiplied with to determine a final estimation. Defaults to
1.03
|=======================================================================
`indices.breaker.fielddata.limit`::
Limit for fielddata breaker, defaults to 60% of JVM heap

`indices.breaker.fielddata.overhead`::
A constant that all field data estimations are multiplied with to determine a
final estimation. Defaults to 1.03

`indices.fielddata.breaker.limit`::
deprecated[1.4.0,Replaced by `indices.breaker.fielddata.limit`]

`indices.fielddata.breaker.overhead`::
deprecated[1.4.0,Replaced by `indices.breaker.fielddata.overhead`]

[float]
[[request-circuit-breaker]]
==== Request circuit breaker

coming[1.4.0]

The request circuit breaker allows Elasticsearch to prevent per-request data
structures (for example, memory used for calculating aggregations during a
request) from exceeding a certain amount of memory.

`indices.breaker.request.limit`::
Limit for request breaker, defaults to 40% of JVM heap

Both the `indices.fielddata.breaker.limit` and
`indices.fielddata.breaker.overhead` can be changed dynamically using the
cluster update settings API.
`indices.breaker.request.overhead`::
A constant that all request estimations are multiplied with to determine a
final estimation. Defaults to 1

[float]
[[fielddata-monitoring]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
Expand Down Expand Up @@ -75,15 +75,15 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
private HttpStats http;

@Nullable
private FieldDataBreakerStats breaker;
private AllCircuitBreakerStats breaker;

NodeStats() {
}

public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
@Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http,
@Nullable FieldDataBreakerStats breaker) {
@Nullable AllCircuitBreakerStats breaker) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
Expand Down Expand Up @@ -174,7 +174,7 @@ public HttpStats getHttp() {
}

@Nullable
public FieldDataBreakerStats getBreaker() {
public AllCircuitBreakerStats getBreaker() {
return this.breaker;
}

Expand Down Expand Up @@ -215,7 +215,7 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
http = HttpStats.readHttpStats(in);
}
breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in);
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.PluginsModule;
Expand Down Expand Up @@ -187,6 +188,7 @@ public TransportClient(Settings pSettings, boolean loadConfigSettings) throws El
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
Expand Down Expand Up @@ -85,6 +86,11 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
}

public void addDynamicSettings(String... settings) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.breaker;

import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;

import java.util.concurrent.atomic.AtomicLong;

/**
* Breaker that will check a parent's when incrementing
*/
public class ChildMemoryCircuitBreaker implements CircuitBreaker {

private final long memoryBytesLimit;
private final BreakerSettings settings;
private final double overheadConstant;
private final AtomicLong used;
private final AtomicLong trippedCount;
private final ESLogger logger;
private final HierarchyCircuitBreakerService parent;
private final Name name;

/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. This breaker starts with 0 bytes used.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger,
HierarchyCircuitBreakerService parent, Name name) {
this(settings, null, logger, parent, name);
}

/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. Uses the given oldBreaker to initialize
* the starting offset.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
ESLogger logger, HierarchyCircuitBreakerService parent, Name name) {
this.name = name;
this.settings = settings;
this.memoryBytesLimit = settings.getLimit();
this.overheadConstant = settings.getOverhead();
if (oldBreaker == null) {
this.used = new AtomicLong(0);
this.trippedCount = new AtomicLong(0);
} else {
this.used = oldBreaker.used;
this.trippedCount = oldBreaker.trippedCount;
}
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("creating ChildCircuitBreaker with settings {}", this.settings);
}
this.parent = parent;
}

/**
* Method used to trip the breaker, delegates to the parent to determine
* whether to trip the breaker or not
*/
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
this.trippedCount.incrementAndGet();
throw new CircuitBreakingException("[" + this.name + "] Data too large, data for [" +
fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]",
bytesNeeded, this.memoryBytesLimit);
}

/**
* Add a number of bytes, tripping the circuit breaker if the aggregated
* estimates are above the limit. Automatically trips the breaker if the
* memory limit is set to 0. Will never trip the breaker if the limit is
* set < 0, but can still be used to aggregate estimations.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
@Override
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak(label, bytes);
}

long newUsed;
// If there is no limit (-1), we can optimize a bit by using
// .addAndGet() instead of looping (because we don't have to check a
// limit), which makes the RamAccountingTermsEnum case faster.
if (this.memoryBytesLimit == -1) {
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
}
} else {
// Otherwise, check the addition and commit the addition, looping if
// there are conflicts. May result in additional logging, but it's
// trace logging and shouldn't be counted on for additions.
long currentUsed;
do {
currentUsed = this.used.get();
newUsed = currentUsed + bytes;
long newUsedWithOverhead = (long) (newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
this.name,
new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
this.name,
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak(label, newUsedWithOverhead);
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
} while (!this.used.compareAndSet(currentUsed, newUsed));
}

// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit(label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
// breaker has already been incremented
this.used.addAndGet(-bytes);
throw e;
}
return newUsed;
}

/**
* Add an <b>exact</b> number of bytes, not checking for tripping the
* circuit breaker. This bypasses the overheadConstant multiplication.
*
* Also does not check with the parent breaker to see if the parent limit
* has been exceeded.
*
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
*/
@Override
public long addWithoutBreaking(long bytes) {
long u = used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
}
assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
return u;
}

/**
* @return the number of aggregated "used" bytes so far
*/
@Override
public long getUsed() {
return this.used.get();
}

/**
* @return the number of bytes that can be added before the breaker trips
*/
@Override
public long getLimit() {
return this.memoryBytesLimit;
}

/**
* @return the constant multiplier the breaker uses for aggregations
*/
@Override
public double getOverhead() {
return this.overheadConstant;
}

/**
* @return the number of times the breaker has been tripped
*/
@Override
public long getTrippedCount() {
return this.trippedCount.get();
}

/**
* @return the name of the breaker
*/
public Name getName() {
return this.name;
}
}
Loading

0 comments on commit 489b8dc

Please sign in to comment.