Skip to content

Commit

Permalink
ESQL: Prevent topn from using too much memory (#99611)
Browse files Browse the repository at this point in the history
This prevents topn operations from using too much memory by hooking them
into circuit breaking framework. It builds on the work done in
#99316 that moved all topn
storage to byte arrays by adding circuit breaking to process of growing
the underlying byte array.
  • Loading branch information
nik9000 committed Sep 18, 2023
1 parent d58b9ea commit 1a14bc7
Show file tree
Hide file tree
Showing 37 changed files with 738 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.elasticsearch.benchmark.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
Expand All @@ -20,6 +23,8 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -96,7 +101,13 @@ private static Operator operator(String data, int topCount) {
case LONGS_AND_BYTES_REFS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.UTF8);
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(
Settings.EMPTY,
List.of(),
ClusterSettings.createBuiltInClusterSettings()
);
return new TopNOperator(
breakerService.getBreaker(CircuitBreaker.REQUEST),
topCount,
elementTypes,
encoders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,15 +675,26 @@ public LimitedBreaker(String name, ByteSizeValue max) {

@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
long total = used.addAndGet(bytes);
if (total > max.getBytes()) {
throw new CircuitBreakingException(ERROR_MESSAGE, bytes, max.getBytes(), Durability.TRANSIENT);
while (true) {
long old = used.get();
long total = old + bytes;
if (total > max.getBytes()) {
throw new CircuitBreakingException(ERROR_MESSAGE, bytes, max.getBytes(), Durability.TRANSIENT);
}
if (used.compareAndSet(old, total)) {
break;
}
}
}

@Override
public void addWithoutBreaking(long bytes) {
used.addAndGet(bytes);
}

@Override
public long getUsed() {
return used.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

abstract class KeyExtractorForBoolean implements KeyExtractor {
static KeyExtractorForBoolean extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, BooleanBlock block) {
Expand All @@ -37,13 +37,13 @@ static KeyExtractorForBoolean extractorFor(TopNEncoder encoder, boolean ascendin
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, boolean value) {
protected final int nonNul(BreakingBytesRefBuilder key, boolean value) {
key.append(nonNul);
TopNEncoder.DEFAULT_SORTABLE.encodeBoolean(value, key);
return Byte.BYTES + 1;
}

protected final int nul(BytesRefBuilder key) {
protected final int nul(BreakingBytesRefBuilder key) {
key.append(nul);
return 1;
}
Expand All @@ -57,7 +57,7 @@ static class ForVector extends KeyExtractorForBoolean {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
return nonNul(key, vector.getBoolean(position));
}
}
Expand All @@ -71,7 +71,7 @@ static class MinForAscending extends KeyExtractorForBoolean {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -88,7 +88,7 @@ static class MaxForAscending extends KeyExtractorForBoolean {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -105,7 +105,7 @@ static class MinForUnordered extends KeyExtractorForBoolean {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand All @@ -130,7 +130,7 @@ static class MaxForUnordered extends KeyExtractorForBoolean {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

abstract class KeyExtractorForBytesRef implements KeyExtractor {
static KeyExtractorForBytesRef extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, BytesRefBlock block) {
Expand Down Expand Up @@ -40,12 +40,12 @@ static KeyExtractorForBytesRef extractorFor(TopNEncoder encoder, boolean ascendi
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, BytesRef value) {
protected final int nonNul(BreakingBytesRefBuilder key, BytesRef value) {
key.append(nonNul);
return encoder.encodeBytesRef(value, key) + 1;
}

protected final int nul(BytesRefBuilder key) {
protected final int nul(BreakingBytesRefBuilder key) {
key.append(nul);
return 1;
}
Expand All @@ -59,7 +59,7 @@ static class ForVector extends KeyExtractorForBytesRef {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
return nonNul(key, vector.getBytesRef(position, scratch));
}
}
Expand All @@ -73,7 +73,7 @@ static class MinForAscending extends KeyExtractorForBytesRef {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -90,7 +90,7 @@ static class MaxForAscending extends KeyExtractorForBytesRef {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -109,7 +109,7 @@ static class MinForUnordered extends KeyExtractorForBytesRef {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand Down Expand Up @@ -140,7 +140,7 @@ static class MaxForUnordered extends KeyExtractorForBytesRef {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

abstract class KeyExtractorForDouble implements KeyExtractor {
static KeyExtractorForDouble extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, DoubleBlock block) {
Expand All @@ -37,13 +37,13 @@ static KeyExtractorForDouble extractorFor(TopNEncoder encoder, boolean ascending
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, double value) {
protected final int nonNul(BreakingBytesRefBuilder key, double value) {
key.append(nonNul);
TopNEncoder.DEFAULT_SORTABLE.encodeDouble(value, key);
return Double.BYTES + 1;
}

protected final int nul(BytesRefBuilder key) {
protected final int nul(BreakingBytesRefBuilder key) {
key.append(nul);
return 1;
}
Expand All @@ -57,7 +57,7 @@ static class ForVector extends KeyExtractorForDouble {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
return nonNul(key, vector.getDouble(position));
}
}
Expand All @@ -71,7 +71,7 @@ static class MinForAscending extends KeyExtractorForDouble {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -88,7 +88,7 @@ static class MaxForAscending extends KeyExtractorForDouble {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -105,7 +105,7 @@ static class MinForUnordered extends KeyExtractorForDouble {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand All @@ -129,7 +129,7 @@ static class MaxForUnordered extends KeyExtractorForDouble {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

abstract class KeyExtractorForInt implements KeyExtractor {
static KeyExtractorForInt extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, IntBlock block) {
Expand All @@ -37,13 +37,13 @@ static KeyExtractorForInt extractorFor(TopNEncoder encoder, boolean ascending, b
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, int value) {
protected final int nonNul(BreakingBytesRefBuilder key, int value) {
key.append(nonNul);
TopNEncoder.DEFAULT_SORTABLE.encodeInt(value, key);
return Integer.BYTES + 1;
}

protected final int nul(BytesRefBuilder key) {
protected final int nul(BreakingBytesRefBuilder key) {
key.append(nul);
return 1;
}
Expand All @@ -57,7 +57,7 @@ static class ForVector extends KeyExtractorForInt {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
return nonNul(key, vector.getInt(position));
}
}
Expand All @@ -71,7 +71,7 @@ static class MinForAscending extends KeyExtractorForInt {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -88,7 +88,7 @@ static class MaxForAscending extends KeyExtractorForInt {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
Expand All @@ -105,7 +105,7 @@ static class MinForUnordered extends KeyExtractorForInt {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand All @@ -129,7 +129,7 @@ static class MaxForUnordered extends KeyExtractorForInt {
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
public int writeKey(BreakingBytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
Expand Down

0 comments on commit 1a14bc7

Please sign in to comment.