Skip to content

Commit

Permalink
Update BucketUtils#suggestShardSideQueueSize signature (#33210)
Browse files Browse the repository at this point in the history
`BucketUtils#suggestShardSideQueueSize` used to calculate the shard_size based on the number of shards. It returns now a different value only based on whether we are querying a single shard or multiple shards. This commit replaces the numberOfShards argument with a boolean that tells whether we are querying a single shard or not.
  • Loading branch information
javanna committed Sep 18, 2018
1 parent 923b33c commit 131f68c
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,21 @@ private BucketUtils() {}
*
* @param finalSize
* The number of terms required in the final reduce phase.
* @param numberOfShards
* The number of shards being queried.
* @param singleShard
* whether a single shard is being queried, or multiple shards
* @return A suggested default for the size of any shard-side PriorityQueues
*/
public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) {
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
if (finalSize < 1) {
throw new IllegalArgumentException("size must be positive, got " + finalSize);
}
if (numberOfShards < 1) {
throw new IllegalArgumentException("number of shards must be positive, got " + numberOfShards);
}

if (numberOfShards == 1) {
if (singleShard) {
// In the case of a single shard, we do not need to over-request
return finalSize;
}

// Request 50% more buckets on the shards in order to improve accuracy
// as well as a small constant that should help with small values of 'size'
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
return (int) Math.min(Integer.MAX_VALUE, shardSampleSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public int shardSize() {
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards());
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
}

if (requiredSize <= 0 || shardSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}

if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}

// TODO - need to check with mapping that this is indeed a text field....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// heuristic to avoid any wrong-ranking caused by distributed
// counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
context.numberOfShards() == 1));
}
bucketCountThresholds.ensureValidity();
if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,22 @@ public class BucketUtilsTests extends ESTestCase {

public void testBadInput() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(0, 10));
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
assertEquals(e.getMessage(), "size must be positive, got 0");

e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(10, 0));
assertEquals(e.getMessage(), "number of shards must be positive, got 0");
}

public void testOptimizesSingleShard() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, 1));
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
}
}

public void testOverFlow() {
for (int iter = 0; iter < 10; ++iter) {
final int size = Integer.MAX_VALUE - randomInt(10);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
}
}
Expand All @@ -55,7 +51,7 @@ public void testShardSizeIsGreaterThanGlobalSize() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
assertThat(shardSize, greaterThanOrEqualTo(size));
}
}
Expand Down

0 comments on commit 131f68c

Please sign in to comment.