Skip to content

Commit

Permalink
ESQL: Remove default driver context (#99573)
Browse files Browse the repository at this point in the history
This commit removes the default driver implementation.
  • Loading branch information
ChrisHegarty committed Sep 14, 2023
1 parent 840d49a commit 07f6a65
Show file tree
Hide file tree
Showing 44 changed files with 283 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static Operator operator(String grouping, String op, String dataType) {
return new HashAggregationOperator(
List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
() -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024, false),
new DriverContext()
new DriverContext(BigArrays.NON_RECYCLING_INSTANCE)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.benchmark.compute.operator;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
Expand Down Expand Up @@ -80,14 +81,14 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
return switch (operation) {
case "abs" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(new Abs(Source.EMPTY, longField), layout(longField)).get(new DriverContext());
yield EvalMapper.toEvaluator(new Abs(Source.EMPTY, longField), layout(longField)).get(driverContext());
}
case "add" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(
new Add(Source.EMPTY, longField, new Literal(Source.EMPTY, 1L, DataTypes.LONG)),
layout(longField)
).get(new DriverContext());
).get(driverContext());
}
case "date_trunc" -> {
FieldAttribute timestamp = new FieldAttribute(
Expand All @@ -98,28 +99,28 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
yield EvalMapper.toEvaluator(
new DateTrunc(Source.EMPTY, new Literal(Source.EMPTY, Duration.ofHours(24), EsqlDataTypes.TIME_DURATION), timestamp),
layout(timestamp)
).get(new DriverContext());
).get(driverContext());
}
case "equal_to_const" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(
new Equals(Source.EMPTY, longField, new Literal(Source.EMPTY, 100_000L, DataTypes.LONG)),
layout(longField)
).get(new DriverContext());
).get(driverContext());
}
case "long_equal_to_long" -> {
FieldAttribute lhs = longField();
FieldAttribute rhs = longField();
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(new DriverContext());
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext());
}
case "long_equal_to_int" -> {
FieldAttribute lhs = longField();
FieldAttribute rhs = intField();
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(new DriverContext());
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext());
}
case "mv_min", "mv_min_ascending" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(new MvMin(Source.EMPTY, longField), layout(longField)).get(new DriverContext());
yield EvalMapper.toEvaluator(new MvMin(Source.EMPTY, longField), layout(longField)).get(driverContext());
}
default -> throw new UnsupportedOperationException();
};
Expand Down Expand Up @@ -259,4 +260,8 @@ private static void run(String operation) {
checkExpected(operation, output);
}
}

static DriverContext driverContext() {
return new DriverContext(BigArrays.NON_RECYCLING_INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,13 @@
*/
public class DriverContext {

/** A default driver context. The returned bigArrays is non recycling. */
public static DriverContext DEFAULT = new DriverContext(BigArrays.NON_RECYCLING_INSTANCE);

// Working set. Only the thread executing the driver will update this set.
Set<Releasable> workingSet = Collections.newSetFromMap(new IdentityHashMap<>());

private final AtomicReference<Snapshot> snapshot = new AtomicReference<>();

private final BigArrays bigArrays;

// For testing
public DriverContext() {
this(BigArrays.NON_RECYCLING_INSTANCE);
}

public DriverContext(BigArrays bigArrays) {
Objects.requireNonNull(bigArrays);
this.bigArrays = bigArrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public Operator get(DriverContext driverContext) {

@Override
public String describe() {
return "EvalOperator[evaluator=" + evaluator.get(DriverContext.DEFAULT) + "]";
return "EvalOperator[evaluator=" + evaluator.get(new ThrowingDriverContext()) + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Operator get(DriverContext driverContext) {

@Override
public String describe() {
return "FilterOperator[evaluator=" + evaluatorSupplier.get(DriverContext.DEFAULT) + "]";
return "FilterOperator[evaluator=" + evaluatorSupplier.get(new ThrowingDriverContext()) + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.FloatArray;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;

public class ThrowingDriverContext extends DriverContext {
public ThrowingDriverContext() {
super(new ThrowingBigArrays());
}

@Override
public BigArrays bigArrays() {
throw new AssertionError("should not reach here");
}

@Override
public boolean addReleasable(Releasable releasable) {
throw new AssertionError("should not reach here");
}

static class ThrowingBigArrays extends BigArrays {

ThrowingBigArrays() {
super(null, null, "fake");
}

@Override
public ByteArray newByteArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public IntArray newIntArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public LongArray newLongArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public FloatArray newFloatArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public DoubleArray newDoubleArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void testQueryOperator() throws IOException {
assertTrue("duplicated docId=" + docId, actualDocIds.add(docId));
}
});
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
}
OperatorTestCase.runDriver(drivers);
Expand Down Expand Up @@ -144,9 +145,10 @@ public void testQueryOperator() throws IOException {
}
}

// @Repeat(iterations = 1)
public void testGroupingWithOrdinals() throws Exception {
final String gField = "g";
final int numDocs = between(100, 10000);
final int numDocs = 2856; // between(100, 10000);
final Map<BytesRef, Long> expectedCounts = new HashMap<>();
int keyLength = randomIntBetween(1, 10);
try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
Expand Down Expand Up @@ -210,7 +212,7 @@ public String toString() {
};

try (DirectoryReader reader = writer.getReader()) {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();

Driver driver = new Driver(
driverContext,
Expand Down Expand Up @@ -258,14 +260,18 @@ public String toString() {
LongBlock counts = page.getBlock(1);
for (int i = 0; i < keys.getPositionCount(); i++) {
BytesRef spare = new BytesRef();
actualCounts.put(keys.getBytesRef(i, spare), counts.getLong(i));
keys.getBytesRef(i, spare);
actualCounts.put(BytesRef.deepCopyOf(spare), counts.getLong(i));
}
// System.out.println("HEGO: keys.getPositionCount=" + keys.getPositionCount());
// Releasables.close(keys);
}),
() -> {}
);
OperatorTestCase.runDriver(driver);
assertThat(actualCounts, equalTo(expectedCounts));
assertDriverContext(driverContext);
org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased();
}
}
}
Expand All @@ -276,7 +282,7 @@ public void testLimitOperator() {
var values = randomList(positions, positions, ESTestCase::randomLong);

var results = new ArrayList<Long>();
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
var driver = new Driver(
driverContext,
Expand Down Expand Up @@ -388,6 +394,13 @@ private BigArrays bigArrays() {
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}

/**
* A {@link DriverContext} that won't throw {@link CircuitBreakingException}.
*/
protected final DriverContext driverContext() {
return new DriverContext(bigArrays());
}

public static void assertDriverContext(DriverContext driverContext) {
assertTrue(driverContext.isFinished());
assertThat(driverContext.getSnapshot().releasables(), empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public final void testIgnoresNulls() {
int end = between(1_000, 100_000);
List<Page> results = new ArrayList<>();
List<Page> input = CannedSourceOperator.collectPages(simpleInput(end));
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();

try (
Driver d = new Driver(
Expand All @@ -110,30 +110,30 @@ public final void testIgnoresNulls() {

public final void testMultivalued() {
int end = between(1_000, 100_000);
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> input = CannedSourceOperator.collectPages(new PositionMergingSourceOperator(simpleInput(end)));
assertSimpleOutput(input, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator()));
}

public final void testMultivaluedWithNulls() {
int end = between(1_000, 100_000);
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> input = CannedSourceOperator.collectPages(
new NullInsertingSourceOperator(new PositionMergingSourceOperator(simpleInput(end)))
);
assertSimpleOutput(input, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator()));
}

public final void testEmptyInput() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), List.<Page>of().iterator());

assertThat(results, hasSize(1));
assertOutputFromEmpty(results.get(0).getBlock(0));
}

public final void testEmptyInputInitialFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand All @@ -147,7 +147,7 @@ public final void testEmptyInputInitialFinal() {
}

public final void testEmptyInputInitialIntermediateFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void assertOutputFromEmpty(Block b) {
}

public void testRejectsDouble() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
Driver d = new Driver(
driverContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void assertOutputFromEmpty(Block b) {
}

public void testRejectsDouble() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
Driver d = new Driver(
driverContext,
Expand Down

0 comments on commit 07f6a65

Please sign in to comment.