Skip to content
Permalink
Browse files
ElasticResultRowAsyncIterator should not limit result set to 10k resu…
…lts (#588)

* fix: ElasticResultRowAsyncIterator should not limit result set to 10k results

* minor: improve readability

* TRACK_TOTAL_HITS_DEFAULT back to 10k
  • Loading branch information
fabriziofortino committed Jun 8, 2022
1 parent 588b360 commit 4c59b366dcd779838e1c3ad23fd6021532d13c4a
Showing 3 changed files with 42 additions and 1 deletion.
@@ -63,6 +63,9 @@ public class ElasticIndexDefinition extends IndexDefinition {
public static final String QUERY_FETCH_SIZES = "queryFetchSizes";
public static final Long[] QUERY_FETCH_SIZES_DEFAULT = new Long[]{100L, 1000L};

public static final String TRACK_TOTAL_HITS = "trackTotalHits";
public static final Integer TRACK_TOTAL_HITS_DEFAULT = 10000;

/**
* Hidden property for storing a seed value to be used as suffix in remote index name.
*/
@@ -114,6 +117,7 @@ public class ElasticIndexDefinition extends IndexDefinition {
public final int numberOfShards;
public final int numberOfReplicas;
public final int[] queryFetchSizes;
public final Integer trackTotalHits;

private final Map<String, List<PropertyDefinition>> propertiesByName;
private final List<PropertyDefinition> dynamicBoostProperties;
@@ -134,6 +138,7 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
this.similarityTagsBoost = getOptionalValue(defn, SIMILARITY_TAGS_BOOST, SIMILARITY_TAGS_BOOST_DEFAULT);
this.queryFetchSizes = Arrays.stream(getOptionalValues(defn, QUERY_FETCH_SIZES, Type.LONGS, Long.class, QUERY_FETCH_SIZES_DEFAULT))
.mapToInt(Long::intValue).toArray();
this.trackTotalHits = getOptionalValue(defn, TRACK_TOTAL_HITS, TRACK_TOTAL_HITS_DEFAULT);

this.propertiesByName = getDefinedRules()
.stream()
@@ -22,6 +22,7 @@
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexNode;
@@ -220,6 +221,7 @@ class ElasticQueryScanner {
SearchRequest searchReq = SearchRequest.of(builder -> {
builder
.index(indexNode.getDefinition().getIndexAlias())
.trackTotalHits(thb -> thb.count(indexNode.getDefinition().trackTotalHits))
.sort(sorts)
.source(sourceConfig)
.query(query)
@@ -272,7 +274,11 @@ public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
LOG.debug("Processing search response that took {} to read {}/{} docs", searchResponse.took(), hitsSize, totalHits);
lastHitSortValues = searchHits.get(hitsSize - 1).sort();
scannedRows += hitsSize;
anyDataLeft.set(totalHits > scannedRows);
if (searchResponse.hits().total().relation() == TotalHitsRelation.Eq) {
anyDataLeft.set(totalHits > scannedRows);
} else {
anyDataLeft.set(true);
}
estimator.update(indexPlan.getFilter(), totalHits);

// now that we got the last hit we can release the semaphore to potentially unlock other requests
@@ -325,6 +331,7 @@ private void scan() {
if (semaphore.tryAcquire() && anyDataLeft.get()) {
final SearchRequest searchReq = SearchRequest.of(s -> s
.index(indexNode.getDefinition().getIndexAlias())
.trackTotalHits(thb -> thb.count(indexNode.getDefinition().trackTotalHits))
.sort(sorts)
.source(sourceConfig)
.searchAfter(lastHitSortValues)
@@ -29,6 +29,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.jackrabbit.oak.plugins.index.elastic.ElasticTestUtils.randomString;
@@ -208,4 +209,32 @@ public void indexWithCustomFetchSizes() throws Exception {
assertQuery("select [jcr:path] from [nt:base] where [c] = 'text'", results);
verify(spyMetricHandler, times(1)).markQuery(anyString(), anyBoolean());
}

@Test
public void indexWithLowTrackTotalHits() throws Exception {
BiConsumer<String, Iterable<Long>> buildIndex = (p, fetchSizes) -> {
IndexDefinitionBuilder builder = createIndex(p).noAsync();
builder.getBuilderTree().setProperty("queryFetchSizes", fetchSizes, Type.LONGS);
builder.getBuilderTree().setProperty("trackTotalHits", 10L, Type.LONG);
builder.indexRule("nt:base").property(p).propertyIndex();
setIndex(p + "_" + UUID.randomUUID(), builder);
};

buildIndex.accept("a", Collections.singletonList(10L));
root.commit();

Tree content = root.getTree("/").addChild("content");

List<String> results = IntStream.range(0, 100)
.mapToObj(n -> {
Tree child = content.addChild("child_" + n);
child.setProperty("a", "text");
return "/content/child_" + n;
})
.collect(Collectors.toList());

root.commit();

assertEventually(() -> assertQuery("select [jcr:path] from [nt:base] where [a] = 'text'", results));
}
}

0 comments on commit 4c59b36

Please sign in to comment.