Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add single request circuit breaker. #46962

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ public void circuitBreak(String fieldName, long bytesNeeded) {
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
}

/**
* Method used to trip the single request breaker
*/
public void singleRequestCircuitBreak(String fieldName, long bytesNeeded) {
this.trippedCount.incrementAndGet();
final String message = "[single_request] Data too large, data for [" + fieldName + "]" +
" would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
", which is larger than the single request limit of [" + parent.getSingleRequestBreakerLimit() +
"/" + new ByteSizeValue(parent.getSingleRequestBreakerLimit()) + "]";
logger.debug("{}", message);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
}

/**
* Add a number of bytes, tripping the circuit breaker if the aggregated
* estimates are above the limit. Automatically trips the breaker if the
Expand All @@ -113,6 +126,11 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws Cir
circuitBreak(label, bytes);
}

// check single request breaker
if (name.equals(REQUEST) && bytes > parent.getSingleRequestBreakerLimit()) {
singleRequestCircuitBreak(label, bytes);
}

long newUsed;
// If there is no limit (-1), we can optimize a bit by using
// .addAndGet() instead of looping (because we don't have to check a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.SINGLE_REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
IndexModule.NODE_STORE_ALLOW_MMAP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);

public static final Setting<ByteSizeValue> SINGLE_REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("indices.breaker.single_request.limit", "30%", Property.Dynamic, Property.NodeScope);

public static final Setting<ByteSizeValue> ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("indices.breaker.accounting.limit", "100%", Property.Dynamic, Property.NodeScope);
public static final Setting<Double> ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING =
Expand All @@ -93,6 +96,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);

private long singleRequestMemoryBytesLimit;
private final boolean trackRealMemoryUsage;
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
Expand Down Expand Up @@ -142,6 +146,7 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster
}

this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings);
this.singleRequestMemoryBytesLimit = SINGLE_REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes();

registerBreaker(this.requestSettings);
registerBreaker(this.fielddataSettings);
Expand All @@ -156,6 +161,7 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
this::setRequestBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(SINGLE_REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, this::setSingleRequestBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
this::setAccountingBreakerLimit);
}
Expand All @@ -168,6 +174,15 @@ private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newReque
logger.info("Updated breaker settings request: {}", newRequestSettings);
}

private void setSingleRequestBreakerLimit(ByteSizeValue newSingleRequestMax) {
singleRequestMemoryBytesLimit = newSingleRequestMax.getBytes();
logger.info("Updated breaker settings single request: {}", singleRequestMemoryBytesLimit);
}

public final long getSingleRequestBreakerLimit() {
return singleRequestMemoryBytesLimit;
}

private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) {
BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
newInFlightRequestsMax.getBytes(), newInFlightRequestsOverhead, this.inFlightRequestsSettings.getType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,21 @@ public void testTrippedCircuitBreakerDurability() {
}
}

public void testSingleRequestBreaker() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50mb")
.put(HierarchyCircuitBreakerService.SINGLE_REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "25mb")
.build();
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST);
CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> requestCircuitBreaker
.addEstimateBytesAndMaybeBreak(new ByteSizeValue(30, ByteSizeUnit.MB).getBytes(), "should break"));
assertThat(exception.getMessage(), containsString("[single_request] Data too large, data for [should break] " +
"would be [31457280/30mb], which is larger than the single request limit of [26214400/25mb]"));
}
}

private long mb(long size) {
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
}
Expand Down