Skip to content

Commit

Permalink
ESQL: Add query and index to the source status (#103284)
Browse files Browse the repository at this point in the history
This adds the query and index to the status of all of the "lucene
source" operators. Because these can hit multiple indices and queries
this looks like:
```
{
  "operator" : "LuceneSourceOperator[maxPageSize=10, remainingDocs=330]",
  "status" : {
    "processed_slices" : 1,
    "processed_queries" : [
      "*:*"
    ],
    "processed_shards" : [
      "tsdb:0"
    ],
    "slice_index" : 1,
    "total_slices" : 4,
    "pages_emitted" : 17,
    "slice_min" : 0,
    "slice_max" : 157489,
    "current" : 170
  }
},
```
  • Loading branch information
nik9000 committed Dec 27, 2023
1 parent 953b7ba commit b6f4afd
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ static TransportVersion def(int id) {
public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0);
public static final TransportVersion TEXT_EXPANSION_TOKEN_PRUNING_CONFIG_ADDED = def(8_562_00_0);
public static final TransportVersion ESQL_ASYNC_QUERY = def(8_563_00_0);
public static final TransportVersion ESQL_STATUS_INCLUDE_LUCENE_QUERIES = def(8_564_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -30,8 +31,13 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class LuceneOperator extends SourceOperator {
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
Expand All @@ -40,10 +46,16 @@ public abstract class LuceneOperator extends SourceOperator {

protected final BlockFactory blockFactory;

private int processSlices;
/**
* Count of the number of slices processed.
*/
private int processedSlices;
final int maxPageSize;
private final LuceneSliceQueue sliceQueue;

private final Set<Query> processedQueries = new HashSet<>();
private final Set<String> processedShards = new HashSet<>();

private LuceneSlice currentSlice;
private int sliceIndex;

Expand All @@ -52,7 +64,7 @@ public abstract class LuceneOperator extends SourceOperator {
int pagesEmitted;
boolean doneCollecting;

public LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.sliceQueue = sliceQueue;
Expand All @@ -73,18 +85,23 @@ LuceneScorer getCurrentOrLoadNextScorer() {
if (currentSlice == null) {
doneCollecting = true;
return null;
} else {
processSlices++;
}
if (currentSlice.numLeaves() == 0) {
continue;
}
processedSlices++;
processedShards.add(
currentSlice.searchContext().getSearchExecutionContext().getFullyQualifiedIndex().getName()
+ ":"
+ currentSlice.searchContext().getSearchExecutionContext().getShardId()
);
}
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
logger.trace("Starting {}", partialLeaf);
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
final Weight weight = currentSlice.weight().get();
processedQueries.add(weight.getQuery());
currentScorer = new LuceneScorer(currentSlice.shardIndex(), currentSlice.searchContext(), weight, leaf);
}
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
Expand Down Expand Up @@ -190,6 +207,8 @@ public static class Status implements Operator.Status {
);

private final int processedSlices;
private final Set<String> processedQueries;
private final Set<String> processedShards;
private final int totalSlices;
private final int pagesEmitted;
private final int sliceIndex;
Expand All @@ -198,7 +217,9 @@ public static class Status implements Operator.Status {
private final int current;

private Status(LuceneOperator operator) {
processedSlices = operator.processSlices;
processedSlices = operator.processedSlices;
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
processedShards = new TreeSet<>(operator.processedShards);
sliceIndex = operator.sliceIndex;
totalSlices = operator.sliceQueue.totalSlices();
LuceneSlice slice = operator.currentSlice;
Expand All @@ -219,8 +240,20 @@ private Status(LuceneOperator operator) {
pagesEmitted = operator.pagesEmitted;
}

Status(int processedSlices, int sliceIndex, int totalSlices, int pagesEmitted, int sliceMin, int sliceMax, int current) {
Status(
int processedSlices,
Set<String> processedQueries,
Set<String> processedShards,
int sliceIndex,
int totalSlices,
int pagesEmitted,
int sliceMin,
int sliceMax,
int current
) {
this.processedSlices = processedSlices;
this.processedQueries = processedQueries;
this.processedShards = processedShards;
this.sliceIndex = sliceIndex;
this.totalSlices = totalSlices;
this.pagesEmitted = pagesEmitted;
Expand All @@ -231,6 +264,13 @@ private Status(LuceneOperator operator) {

Status(StreamInput in) throws IOException {
processedSlices = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_STATUS_INCLUDE_LUCENE_QUERIES)) {
processedQueries = in.readCollectionAsSet(StreamInput::readString);
processedShards = in.readCollectionAsSet(StreamInput::readString);
} else {
processedQueries = Collections.emptySet();
processedShards = Collections.emptySet();
}
sliceIndex = in.readVInt();
totalSlices = in.readVInt();
pagesEmitted = in.readVInt();
Expand All @@ -242,6 +282,10 @@ private Status(LuceneOperator operator) {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processedSlices);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_STATUS_INCLUDE_LUCENE_QUERIES)) {
out.writeCollection(processedQueries, StreamOutput::writeString);
out.writeCollection(processedShards, StreamOutput::writeString);
}
out.writeVInt(sliceIndex);
out.writeVInt(totalSlices);
out.writeVInt(pagesEmitted);
Expand All @@ -259,6 +303,14 @@ public int processedSlices() {
return processedSlices;
}

public Set<String> processedQueries() {
return processedQueries;
}

public Set<String> processedShards() {
return processedShards;
}

public int sliceIndex() {
return sliceIndex;
}
Expand Down Expand Up @@ -287,6 +339,8 @@ public int current() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("processed_slices", processedSlices);
builder.field("processed_queries", processedQueries);
builder.field("processed_shards", processedShards);
builder.field("slice_index", sliceIndex);
builder.field("total_slices", totalSlices);
builder.field("pages_emitted", pagesEmitted);
Expand All @@ -302,6 +356,8 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedSlices == status.processedSlices
&& processedQueries.equals(status.processedQueries)
&& processedShards.equals(status.processedShards)
&& sliceIndex == status.sliceIndex
&& totalSlices == status.totalSlices
&& pagesEmitted == status.pagesEmitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
Expand All @@ -27,15 +26,11 @@
import org.elasticsearch.compute.operator.OperatorTestCase;
import org.elasticsearch.compute.operator.TestResultPageSinkOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.indices.CrankyCircuitBreakerService;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.junit.After;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -44,7 +39,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class LuceneCountOperatorTests extends AnyOperatorTestCase {
Expand Down Expand Up @@ -89,10 +83,8 @@ private LuceneCountOperator.Factory simple(BigArrays bigArrays, DataPartitioning
throw new RuntimeException(e);
}

SearchContext ctx = mockSearchContext(reader);
SearchExecutionContext ectx = mock(SearchExecutionContext.class);
when(ctx.getSearchExecutionContext()).thenReturn(ectx);
when(ectx.getIndexReader()).thenReturn(reader);
SearchContext ctx = LuceneSourceOperatorTests.mockSearchContext(reader);
when(ctx.getSearchExecutionContext().getIndexReader()).thenReturn(reader);
final Query query;
if (enableShortcut && randomBoolean()) {
query = new MatchAllDocsQuery();
Expand Down Expand Up @@ -185,25 +177,4 @@ private void testCount(Supplier<DriverContext> contexts, int size, int limit) {
assertThat(totalCount, equalTo((long) size));
}
}

/**
* Creates a mock search context with the given index reader.
* The returned mock search context can be used to test with {@link LuceneOperator}.
*/
public static SearchContext mockSearchContext(IndexReader reader) {
try {
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
TrivialQueryCachingPolicy.NEVER,
true
);
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.searcher()).thenReturn(searcher);
return searchContext;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,39 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import static org.hamcrest.Matchers.equalTo;

public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTestCase<LuceneSourceOperator.Status> {
public static LuceneSourceOperator.Status simple() {
return new LuceneSourceOperator.Status(0, 0, 1, 5, 123, 99990, 8000);
return new LuceneSourceOperator.Status(2, Set.of("*:*"), new TreeSet<>(List.of("a:0", "a:1")), 0, 1, 5, 123, 99990, 8000);
}

public static String simpleToJson() {
return """
{"processed_slices":0,"slice_index":0,"total_slices":1,"pages_emitted":5,"slice_min":123,"slice_max":99990,"current":8000}""";
{
"processed_slices" : 2,
"processed_queries" : [
"*:*"
],
"processed_shards" : [
"a:0",
"a:1"
],
"slice_index" : 0,
"total_slices" : 1,
"pages_emitted" : 5,
"slice_min" : 123,
"slice_max" : 99990,
"current" : 8000
}""";
}

public void testToXContent() {
assertThat(Strings.toString(simple()), equalTo(simpleToJson()));
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
}

@Override
Expand All @@ -37,6 +56,8 @@ protected Writeable.Reader<LuceneSourceOperator.Status> instanceReader() {
public LuceneSourceOperator.Status createTestInstance() {
return new LuceneSourceOperator.Status(
randomNonNegativeInt(),
randomProcessedQueries(),
randomProcessedShards(),
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt(),
Expand All @@ -46,26 +67,58 @@ public LuceneSourceOperator.Status createTestInstance() {
);
}

private static Set<String> randomProcessedQueries() {
int size = between(0, 10);
Set<String> set = new TreeSet<>();
while (set.size() < size) {
set.add(randomAlphaOfLength(5));
}
return set;
}

private static Set<String> randomProcessedShards() {
int size = between(0, 10);
Set<String> set = new TreeSet<>();
while (set.size() < size) {
set.add(randomAlphaOfLength(3) + ":" + between(0, 10));
}
return set;
}

@Override
protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) {
int processedSlices = instance.processedSlices();
Set<String> processedQueries = instance.processedQueries();
Set<String> processedShards = instance.processedShards();
int sliceIndex = instance.sliceIndex();
int totalSlices = instance.totalSlices();
int pagesEmitted = instance.pagesEmitted();
int sliceMin = instance.sliceMin();
int sliceMax = instance.sliceMax();
int current = instance.current();
switch (between(0, 6)) {
switch (between(0, 8)) {
case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
case 1 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
case 2 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
case 4 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
case 5 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
case 6 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries);
case 2 -> processedQueries = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards);
case 3 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
case 4 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
case 5 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
case 6 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
case 7 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
case 8 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
default -> throw new UnsupportedOperationException();
}
;
return new LuceneSourceOperator.Status(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current);
return new LuceneSourceOperator.Status(
processedSlices,
processedQueries,
processedShards,
sliceIndex,
totalSlices,
pagesEmitted,
sliceMin,
sliceMax,
current
);
}
}

0 comments on commit b6f4afd

Please sign in to comment.