Skip to content

Commit

Permalink
Fix bug in circuit-breaker check for geoshape grid aggregations (#57962
Browse files Browse the repository at this point in the history
…) (#59741)

There was a bug in the geoshape circuit-breaker check where the
hash values array was being allocated before its new size was
accounted for by the circuit breaker.

Fixes #57847.
  • Loading branch information
talevy committed Jul 17, 2020
1 parent 199e44f commit dafd103
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.search.DocIdSetIterator;

import java.io.IOException;
import java.util.function.LongConsumer;

/**
* Base implementation that throws an {@link IOException} for the
Expand All @@ -31,6 +32,14 @@
*/
public abstract class AbstractSortingNumericDocValues extends SortingNumericDocValues {

public AbstractSortingNumericDocValues() {
super();
}

public AbstractSortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
super(circuitBreakerConsumer);
}

@Override
public int docID() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.util.InPlaceMergeSorter;
import org.apache.lucene.util.Sorter;

import java.util.function.LongConsumer;

/**
* Base class for building {@link SortedNumericDocValues} instances based on unsorted content.
*/
Expand All @@ -33,8 +35,13 @@ public abstract class SortingNumericDocValues extends SortedNumericDocValues {
protected long[] values;
protected int valuesCursor;
private final Sorter sorter;
private LongConsumer circuitBreakerConsumer;

protected SortingNumericDocValues() {
this(l -> {});
}

protected SortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
values = new long[1];
valuesCursor = 0;
sorter = new InPlaceMergeSorter() {
Expand All @@ -51,6 +58,9 @@ protected int compare(int i, int j) {
return Long.compare(values[i], values[j]);
}
};
this.circuitBreakerConsumer = circuitBreakerConsumer;
// account for initial values size of 1
this.circuitBreakerConsumer.accept(Long.BYTES);
}

/**
Expand All @@ -59,8 +69,25 @@ protected int compare(int i, int j) {
*/
protected final void resize(int newSize) {
count = newSize;
values = ArrayUtil.grow(values, count);
valuesCursor = 0;

if (newSize <= values.length) {
return;
}

// Array is expected to grow so increment the circuit breaker
// to include both the additional bytes used by the grown array
// as well as the overhead of keeping both arrays in memory while
// copying.
long oldValuesSizeInBytes = values.length * Long.BYTES;
int newValuesLength = ArrayUtil.oversize(newSize, Long.BYTES);
circuitBreakerConsumer.accept(newValuesLength * Long.BYTES);

// resize
values = ArrayUtil.growExact(values, newValuesLength);

// account for freeing the old values array
circuitBreakerConsumer.accept(-oldValuesSizeInBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

/** Sorted numeric doc values for precision 0 */
class AllCellValues extends ByteTrackingSortingNumericDocValues {
private MultiGeoShapeValues geoValues;

protected AllCellValues(MultiGeoShapeValues geoValues, GeoGridTiler tiler, Consumer<Long> circuitBreakerConsumer) {
protected AllCellValues(MultiGeoShapeValues geoValues, GeoGridTiler tiler, LongConsumer circuitBreakerConsumer) {
super(circuitBreakerConsumer);
this.geoValues = geoValues;
resize(1);
values[0] = tiler.encode(0, 0, 0);
circuitBreakerConsumer.accept((long) Long.BYTES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@

import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;

import java.util.function.LongConsumer;

/**
* Wrapper class for GeoGrid to expose the protected values array for testing
*/
abstract class ByteTrackingSortingNumericDocValues extends AbstractSortingNumericDocValues {

public long getValuesBytes() {
ByteTrackingSortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
super(circuitBreakerConsumer);
}

long getValuesBytes() {
return values.length * Long.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;

import java.util.function.Consumer;
import java.util.function.LongConsumer;

public class GeoShapeCellIdSource extends ValuesSource.Numeric {
private final GeoShapeValuesSource valuesSource;
private final int precision;
private final GeoGridTiler encoder;
private Consumer<Long> circuitBreakerConsumer;
private LongConsumer circuitBreakerConsumer;

public GeoShapeCellIdSource(GeoShapeValuesSource valuesSource, int precision, GeoGridTiler encoder) {
this.valuesSource = valuesSource;
Expand All @@ -36,7 +36,7 @@ public GeoShapeCellIdSource(GeoShapeValuesSource valuesSource, int precision, Ge
* accessible from within the values-source. Problem is that this values-source needs to
* be created and passed to the aggregator before we have access to this functionality.
*/
public void setCircuitBreakerConsumer(Consumer<Long> circuitBreakerConsumer) {
public void setCircuitBreakerConsumer(LongConsumer circuitBreakerConsumer) {
this.circuitBreakerConsumer = circuitBreakerConsumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

/** Sorted numeric doc values for geo shapes */
class GeoShapeCellValues extends ByteTrackingSortingNumericDocValues {
private final MultiGeoShapeValues geoShapeValues;
private final Consumer<Long> circuitBreakerConsumer;
protected int precision;
protected GeoGridTiler tiler;

protected GeoShapeCellValues(MultiGeoShapeValues geoShapeValues, int precision, GeoGridTiler tiler,
Consumer<Long> circuitBreakerConsumer) {
LongConsumer circuitBreakerConsumer) {
super(circuitBreakerConsumer);
this.geoShapeValues = geoShapeValues;
this.precision = precision;
this.tiler = tiler;
this.circuitBreakerConsumer = circuitBreakerConsumer;
circuitBreakerConsumer.accept((long) Long.BYTES);
}

@Override
Expand All @@ -45,18 +43,13 @@ protected long[] getValues() {
return values;
}

protected void add(int idx, long value) {
values[idx] = value;
}

void resizeCell(int newSize) {
int oldValuesLength = values.length;
resize(newSize);
int newValuesLength = values.length;
if (newValuesLength > oldValuesLength) {
long bytesDiff = (newValuesLength - oldValuesLength) * Long.BYTES;
circuitBreakerConsumer.accept(bytesDiff);
}
}


protected void add(int idx, long value) {
values[idx] = value;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.LinearRing;
import org.elasticsearch.geometry.MultiLine;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.MultiPolygon;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.geometry.Polygon;
Expand All @@ -34,9 +33,11 @@
import org.elasticsearch.xpack.spatial.index.fielddata.TriangleTreeReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.List;
import java.util.function.LongConsumer;

import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.LATITUDE_MASK;
import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.NORMALIZED_LATITUDE_MASK;
Expand All @@ -50,7 +51,7 @@
public class GeoGridTilerTests extends ESTestCase {
private static final GeoTileGridTiler GEOTILE = new GeoTileGridTiler();
private static final GeoHashGridTiler GEOHASH = new GeoHashGridTiler();
private static final Consumer<Long> NOOP_BREAKER = (l) -> {};
private static final LongConsumer NOOP_BREAKER = (l) -> {};

public void testGeoTile() throws Exception {
double x = randomDouble();
Expand Down Expand Up @@ -464,33 +465,40 @@ public void testGeoTileGridCircuitBreaker() throws IOException {
}

private void testCircuitBreaker(GeoGridTiler tiler) throws IOException {
MultiPoint multiPoint = GeometryTestUtils.randomMultiPoint(false);
int precision = randomIntBetween(0, 6);
TriangleTreeReader reader = triangleTreeReader(multiPoint, GeoShapeCoordinateEncoder.INSTANCE);
Polygon polygon = GeometryTestUtils.randomPolygon(false);
int precision = randomIntBetween(0, 5);
TriangleTreeReader reader = triangleTreeReader(polygon, GeoShapeCoordinateEncoder.INSTANCE);
MultiGeoShapeValues.GeoShapeValue value = new MultiGeoShapeValues.GeoShapeValue(reader);

final long numBytes;
List<Long> byteChangeHistory = new ArrayList<>();
if (precision == 0) {
AllCellValues values = new AllCellValues(null, tiler, NOOP_BREAKER);
numBytes = values.getValuesBytes();
new AllCellValues(null, tiler, byteChangeHistory::add);
} else {
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, NOOP_BREAKER);
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, byteChangeHistory::add);
tiler.setValues(values, value, precision);
numBytes = values.getValuesBytes();
}

final long maxNumBytes;
final long curNumBytes;
if (byteChangeHistory.size() == 1) {
curNumBytes = maxNumBytes = byteChangeHistory.get(byteChangeHistory.size() - 1);
} else {
long oldNumBytes = -byteChangeHistory.get(byteChangeHistory.size() - 1);
curNumBytes = byteChangeHistory.get(byteChangeHistory.size() - 2);
maxNumBytes = oldNumBytes + curNumBytes;
}

CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY,
Collections.singletonList(new BreakerSettings("limited", numBytes - 1, 1.0)),
Collections.singletonList(new BreakerSettings("limited", maxNumBytes - 1, 1.0)),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
CircuitBreaker limitedBreaker = service.getBreaker("limited");

Consumer<Long> circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");
LongConsumer circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");
expectThrows(CircuitBreakingException.class, () -> {
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, circuitBreakerConsumer);
tiler.setValues(values, value, precision);
assertThat(values.getValuesBytes(), equalTo(numBytes));
assertThat(limitedBreaker.getUsed(), equalTo(numBytes));
assertThat(values.getValuesBytes(), equalTo(curNumBytes));
assertThat(limitedBreaker.getUsed(), equalTo(curNumBytes));
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
indices.refresh: {}

- do:
catch: /data for \[<agg \[grid\]>\] would be \[28648\/27.9kb\], which is larger than the limit of \[25600\/25kb\]/
catch: /data for \[<agg \[grid\]>\] would be \[42760\/41.7kb\], which is larger than the limit of \[25600\/25kb\]/
search:
rest_total_hits_as_int: true
index: locations
Expand All @@ -93,7 +93,7 @@
field: location

- do:
catch: /data for \[<agg \[grid\]>\] would be \[28648\/27.9kb\], which is larger than the limit of \[25600\/25kb\]/
catch: /data for \[<agg \[grid\]>\] would be \[42760\/41.7kb\], which is larger than the limit of \[25600\/25kb\]/
search:
rest_total_hits_as_int: true
index: locations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
indices.refresh: {}

- do:
catch: /data for \[<agg \[grid\]>\] would be \[26344\/25.7kb\], which is larger than the limit of \[25600\/25kb\]/
catch: /data for \[<agg \[grid\]>\] would be \[30160\/29.4kb\], which is larger than the limit of \[25600\/25kb\]/
search:
rest_total_hits_as_int: true
index: locations
Expand All @@ -95,7 +95,7 @@
field: location

- do:
catch: /data for \[<agg \[grid\]>\] would be \[26344\/25.7kb\], which is larger than the limit of \[25600\/25kb\]/
catch: /data for \[<agg \[grid\]>\] would be \[30160\/29.4kb\], which is larger than the limit of \[25600\/25kb\]/
search:
rest_total_hits_as_int: true
index: locations
Expand Down

0 comments on commit dafd103

Please sign in to comment.