Skip to content

Commit

Permalink
Remove non-breaking block factory from production (#103661)
Browse files Browse the repository at this point in the history
We now can remove the non-breaking block factory from production code. 
All blocks are now properly tracked except two small places in the planner.
  • Loading branch information
dnhatn committed Dec 21, 2023
1 parent 5d8e297 commit 509dfbd
Show file tree
Hide file tree
Showing 42 changed files with 167 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.benchmark.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.BytesRefArray;
Expand Down Expand Up @@ -111,6 +112,11 @@ public class BlockBenchmark {

private static final Random random = new Random();

private static final BlockFactory blockFactory = BlockFactory.getInstance(
new NoopCircuitBreaker("noop"),
BigArrays.NON_RECYCLING_INSTANCE
);

static {
// Smoke test all the expected values and force loading subclasses more like prod
int totalPositions = 10;
Expand All @@ -131,7 +137,6 @@ public class BlockBenchmark {
private record BenchmarkBlocks(Block[] blocks, long[] checkSums) {};

private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, int totalPositions) {
BlockFactory blockFactory = BlockFactory.getNonBreakingInstance();
Block[] blocks = new Block[NUM_BLOCKS_PER_ITERATION];
long[] checkSums = new long[NUM_BLOCKS_PER_ITERATION];

Expand Down Expand Up @@ -189,7 +194,7 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
firstValueIndexes,
nulls,
Block.MvOrdering.UNORDERED,
BlockFactory.getNonBreakingInstance()
blockFactory
);
}
case "vector" -> {
Expand Down Expand Up @@ -314,9 +319,7 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
int[] firstValueIndexes = randomFirstValueIndexes(totalPositions);
int positionCount = firstValueIndexes.length - 1;
BitSet nulls = randomNulls(positionCount);
DoubleArray valuesBigArray = BlockFactory.getNonBreakingInstance()
.bigArrays()
.newDoubleArray(totalPositions, false);
DoubleArray valuesBigArray = blockFactory.bigArrays().newDoubleArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand All @@ -327,17 +330,15 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
firstValueIndexes,
nulls,
Block.MvOrdering.UNORDERED,
BlockFactory.getNonBreakingInstance()
blockFactory
);
}
case "vector" -> {
DoubleVector vector = blockFactory.newDoubleArrayVector(values, totalPositions);
blocks[blockIndex] = vector.asBlock();
}
case "vector-big-array" -> {
DoubleArray valuesBigArray = BlockFactory.getNonBreakingInstance()
.bigArrays()
.newDoubleArray(totalPositions, false);
DoubleArray valuesBigArray = blockFactory.bigArrays().newDoubleArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand Down Expand Up @@ -395,7 +396,7 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
int[] firstValueIndexes = randomFirstValueIndexes(totalPositions);
int positionCount = firstValueIndexes.length - 1;
BitSet nulls = randomNulls(positionCount);
IntArray valuesBigArray = BlockFactory.getNonBreakingInstance().bigArrays().newIntArray(totalPositions, false);
IntArray valuesBigArray = blockFactory.bigArrays().newIntArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand All @@ -406,15 +407,15 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
firstValueIndexes,
nulls,
Block.MvOrdering.UNORDERED,
BlockFactory.getNonBreakingInstance()
blockFactory
);
}
case "vector" -> {
IntVector vector = blockFactory.newIntArrayVector(values, totalPositions);
blocks[blockIndex] = vector.asBlock();
}
case "vector-big-array" -> {
IntArray valuesBigArray = BlockFactory.getNonBreakingInstance().bigArrays().newIntArray(totalPositions, false);
IntArray valuesBigArray = blockFactory.bigArrays().newIntArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand Down Expand Up @@ -472,9 +473,7 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
int[] firstValueIndexes = randomFirstValueIndexes(totalPositions);
int positionCount = firstValueIndexes.length - 1;
BitSet nulls = randomNulls(positionCount);
LongArray valuesBigArray = BlockFactory.getNonBreakingInstance()
.bigArrays()
.newLongArray(totalPositions, false);
LongArray valuesBigArray = blockFactory.bigArrays().newLongArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand All @@ -485,17 +484,15 @@ private static BenchmarkBlocks buildBlocks(String dataType, String blockKind, in
firstValueIndexes,
nulls,
Block.MvOrdering.UNORDERED,
BlockFactory.getNonBreakingInstance()
blockFactory
);
}
case "vector" -> {
LongVector vector = blockFactory.newLongArrayVector(values, totalPositions);
blocks[blockIndex] = vector.asBlock();
}
case "vector-big-array" -> {
LongArray valuesBigArray = BlockFactory.getNonBreakingInstance()
.bigArrays()
.newLongArray(totalPositions, false);
LongArray valuesBigArray = blockFactory.bigArrays().newLongArray(totalPositions, false);
for (int i = 0; i < values.length; i++) {
valuesBigArray.set(i, values[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,18 @@ public void setup() {
@Benchmark
@OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
public void adaptive() {
MultivalueDedupe.dedupeToBlockAdaptive(block, BlockFactory.getNonBreakingInstance()).close();
MultivalueDedupe.dedupeToBlockAdaptive(block, blockFactory).close();
}

@Benchmark
@OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
public void copyAndSort() {
MultivalueDedupe.dedupeToBlockUsingCopyAndSort(block, BlockFactory.getNonBreakingInstance()).close();
MultivalueDedupe.dedupeToBlockUsingCopyAndSort(block, blockFactory).close();
}

@Benchmark
@OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
public void copyMissing() {
MultivalueDedupe.dedupeToBlockUsingCopyMissing(block, BlockFactory.getNonBreakingInstance()).close();
MultivalueDedupe.dedupeToBlockUsingCopyMissing(block, blockFactory).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private static Operator operator(String data, int topCount) {
ClusterSettings.createBuiltInClusterSettings()
);
return new TopNOperator(
BlockFactory.getNonBreakingInstance(),
blockFactory,
breakerService.getBreaker(CircuitBreaker.REQUEST),
topCount,
elementTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static BlockLoader numericBlockLoader(String name, Where where, NumberFi
@OperationsPerInvocation(INDEX_SIZE)
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
BlockFactory.getNonBreakingInstance(),
blockFactory,
fields(name),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
throw new UnsupportedOperationException("can't load _source here");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefArray;
Expand All @@ -28,11 +27,6 @@ public class BlockFactory {
public static final String MAX_BLOCK_PRIMITIVE_ARRAY_SIZE_SETTING = "esql.block_factory.max_block_primitive_array_size";
public static final ByteSizeValue DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE = ByteSizeValue.ofKb(512);

private static final BlockFactory NON_BREAKING = BlockFactory.getInstance(
new NoopCircuitBreaker("noop-esql-breaker"),
BigArrays.NON_RECYCLING_INSTANCE
);

private final CircuitBreaker breaker;

private final BigArrays bigArrays;
Expand All @@ -54,13 +48,6 @@ public BlockFactory(CircuitBreaker breaker, BigArrays bigArrays, ByteSizeValue m
this.maxPrimitiveArrayBytes = maxPrimitiveArraySize.getBytes();
}

/**
* Returns the Non-Breaking block factory.
*/
public static BlockFactory getNonBreakingInstance() {
return NON_BREAKING;
}

public static BlockFactory getInstance(CircuitBreaker breaker, BigArrays bigArrays) {
return new BlockFactory(breaker, bigArrays, DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.data.TestBlockFactory;
import org.elasticsearch.compute.operator.AggregationOperator;
import org.elasticsearch.compute.operator.CannedSourceOperator;
import org.elasticsearch.compute.operator.Driver;
Expand Down Expand Up @@ -97,7 +98,7 @@ public final void testIgnoresNulls() {
DriverContext driverContext = driverContext();
BlockFactory blockFactory = driverContext.blockFactory();
List<Page> input = CannedSourceOperator.collectPages(simpleInput(blockFactory, end));
List<Page> origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance());
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());

try (
Driver d = new Driver(
Expand All @@ -120,7 +121,7 @@ public final void testMultivalued() {
List<Page> input = CannedSourceOperator.collectPages(
new PositionMergingSourceOperator(simpleInput(driverContext.blockFactory(), end), blockFactory)
);
List<Page> origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance());
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());
assertSimpleOutput(origInput, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator(), driverContext));
}

Expand All @@ -134,7 +135,7 @@ public final void testMultivaluedWithNulls() {
blockFactory
)
);
List<Page> origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance());
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());
assertSimpleOutput(origInput, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator(), driverContext));
}

Expand Down

0 comments on commit 509dfbd

Please sign in to comment.