Skip to content

Commit

Permalink
Include name of the field that caused a circuit break in the log and …
Browse files Browse the repository at this point in the history
…exception message

Fixes elastic#5718
Closes elastic#5841
  • Loading branch information
dakrone authored and mikemccand committed Apr 24, 2014
1 parent d334845 commit b217ce0
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 30 deletions.
Expand Up @@ -81,9 +81,9 @@ public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, Memory
* Method used to trip the breaker
* @throws CircuitBreakingException
*/
public void circuitBreak() throws CircuitBreakingException {
throw new CircuitBreakingException("Data too large, data would be larger than limit of [" +
memoryBytesLimit + "] bytes");
public void circuitBreak(String fieldName) throws CircuitBreakingException {
throw new CircuitBreakingException("Data too large, data for field [" + fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]");
}

/**
Expand All @@ -95,10 +95,10 @@ public void circuitBreak() throws CircuitBreakingException {
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingException {
public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak();
circuitBreak(fieldName);
}

long newUsed;
Expand All @@ -108,8 +108,8 @@ public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingEx
if (this.memoryBytesLimit == -1) {
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}] to used bytes [new used: [{}], limit: [-1b]]",
new ByteSizeValue(bytes), new ByteSizeValue(newUsed));
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed));
}
return newUsed;
}
Expand All @@ -123,16 +123,16 @@ public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingEx
newUsed = currentUsed + bytes;
long newUsedWithOverhead = (long)(newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
new ByteSizeValue(bytes), new ByteSizeValue(newUsed),
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("New used memory {} [{}] would be larger than configured breaker: {} [{}], breaking",
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead),
logger.error("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), fieldName,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak();
circuitBreak(fieldName);
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.index.fielddata.AbstractIndexFieldData;

import java.io.IOException;

Expand All @@ -39,15 +38,18 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum {
private final MemoryCircuitBreaker breaker;
private final TermsEnum termsEnum;
private final AbstractIndexFieldData.PerValueEstimator estimator;
private final String fieldName;
private long totalBytes;
private long flushBuffer;


public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator) {
public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator,
String fieldName) {
super(termsEnum);
this.breaker = breaker;
this.termsEnum = termsEnum;
this.estimator = estimator;
this.fieldName = fieldName;
this.totalBytes = 0;
this.flushBuffer = 0;
}
Expand All @@ -65,7 +67,7 @@ protected AcceptStatus accept(BytesRef term) throws IOException {
* bytes and resetting the buffer.
*/
public void flush() {
breaker.addEstimateBytesAndMaybeBreak(this.flushBuffer);
breaker.addEstimateBytesAndMaybeBreak(this.flushBuffer, this.fieldName);
this.totalBytes += this.flushBuffer;
this.flushBuffer = 0;
}
Expand Down
Expand Up @@ -99,7 +99,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
PackedArrayAtomicFieldData data = null;
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType());
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType(), getFieldNames().fullName());
if (terms == null) {
data = PackedArrayAtomicFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(data.getMemorySizeInBytes());
Expand Down Expand Up @@ -335,10 +335,12 @@ public class PackedArrayEstimator implements PerValueEstimator {

private final MemoryCircuitBreaker breaker;
private final NumericType type;
private final String fieldName;

public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type) {
public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type, String fieldName) {
this.breaker = breaker;
this.type = type;
this.fieldName = fieldName;
}

/**
Expand All @@ -357,7 +359,7 @@ public long bytesPerValue(BytesRef term) {
*/
@Override
public TermsEnum beforeLoad(Terms terms) throws IOException {
return new RamAccountingTermsEnum(type.wrapTermsEnum(terms.iterator(null)), breaker, this);
return new RamAccountingTermsEnum(type.wrapTermsEnum(terms.iterator(null)), breaker, this, this.fieldName);
}

/**
Expand Down
Expand Up @@ -63,7 +63,7 @@ public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettin
public PagedBytesAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception {
AtomicReader reader = context.reader();

PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker());
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(), getFieldNames().fullName());
Terms terms = reader.terms(getFieldNames().indexName());
if (terms == null) {
PagedBytesAtomicFieldData emptyData = PagedBytesAtomicFieldData.empty(reader.maxDoc());
Expand Down Expand Up @@ -133,11 +133,13 @@ public class PagedBytesEstimator implements PerValueEstimator {

private final AtomicReaderContext context;
private final MemoryCircuitBreaker breaker;
private final String fieldName;
private long estimatedBytes;

PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker) {
PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker, String fieldName) {
this.breaker = breaker;
this.context = context;
this.fieldName = fieldName;
}

/**
Expand Down Expand Up @@ -210,15 +212,15 @@ public TermsEnum beforeLoad(Terms terms) throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("Filter exists, can't circuit break normally, using RamAccountingTermsEnum");
}
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this);
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this, this.fieldName);
} else {
estimatedBytes = this.estimateStringFieldData();
// If we weren't able to estimate, wrap in the RamAccountingTermsEnum
if (estimatedBytes == 0) {
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this);
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this, this.fieldName);
}

breaker.addEstimateBytesAndMaybeBreak(estimatedBytes);
breaker.addEstimateBytesAndMaybeBreak(estimatedBytes, fieldName);
return filter(terms, reader);
}
}
Expand Down
Expand Up @@ -239,7 +239,7 @@ public long bytesPerValue(BytesRef term) {
*/
@Override
public TermsEnum beforeLoad(Terms terms) throws IOException {
return new RamAccountingTermsEnum(filteredEnum, breaker, this);
return new RamAccountingTermsEnum(filteredEnum, breaker, this, "parent/child id cache");
}

/**
Expand Down
Expand Up @@ -49,7 +49,7 @@ public void testThreadedUpdatesToBreaker() throws Exception {
public void run() {
for (int j = 0; j < BYTES_PER_THREAD; j++) {
try {
breaker.addEstimateBytesAndMaybeBreak(1L);
breaker.addEstimateBytesAndMaybeBreak(1L, "test");
} catch (CircuitBreakingException e) {
if (tripped.get()) {
assertThat("tripped too many times", true, equalTo(false));
Expand Down Expand Up @@ -77,19 +77,20 @@ public void run() {
@Test
public void testConstantFactor() throws Exception {
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger);
String field = "myfield";

// add only 7 bytes
breaker.addWithoutBreaking(7);

try {
// this won't actually add it because it trips the breaker
breaker.addEstimateBytesAndMaybeBreak(3);
breaker.addEstimateBytesAndMaybeBreak(3, field);
fail("should never reach this");
} catch (CircuitBreakingException cbe) {
}

// shouldn't throw an exception
breaker.addEstimateBytesAndMaybeBreak(2);
breaker.addEstimateBytesAndMaybeBreak(2, field);

assertThat(breaker.getUsed(), equalTo(9L));

Expand All @@ -98,9 +99,10 @@ public void testConstantFactor() throws Exception {

try {
// Adding no bytes still breaks
breaker.addEstimateBytesAndMaybeBreak(0);
breaker.addEstimateBytesAndMaybeBreak(0, field);
fail("should never reach this");
} catch (CircuitBreakingException cbe) {
assertThat(cbe.getMessage().contains("field [" + field + "]"), equalTo(true));
}
}
}
Expand Up @@ -83,7 +83,8 @@ public void testMemoryBreaker() {
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR, containsString("Data too large, data would be larger than limit of [100] bytes"));
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
Expand Down Expand Up @@ -134,7 +135,8 @@ public void testRamAccountingTermsEnum() {
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR, containsString("Data too large, data would be larger than limit of [100] bytes"));
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
Expand Down

0 comments on commit b217ce0

Please sign in to comment.