Skip to content

Commit

Permalink
Replace SourceLookup on HitContext with an immutable Source (#90816)
Browse files Browse the repository at this point in the history
Rather than creating a new SourceLookup for each HitContext, and then
setting a source provider on it after the fact, we instead just take a
Source as a constructor argument.

This commit also adds three Source implementations, `fromBytes` and
`fromMap` to hold pre-loaded data, and `lazyLoading` which will load
the source only if asked for, and tidies up FetchSourcePhase to use them.
  • Loading branch information
romseygeek committed Oct 20, 2022
1 parent 8bfb91f commit 350338f
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand Down Expand Up @@ -78,10 +77,9 @@ private BytesReference buildBigExample(String extraText) throws IOException {
}

@Benchmark
public BytesReference filterObjects() throws IOException {
SourceLookup lookup = new SourceLookup(new SourceLookup.BytesSourceProvider(sourceBytes));
Object value = lookup.filter(fetchContext);
return FetchSourcePhase.objectToBytes(value, XContentType.JSON, Math.min(1024, lookup.internalSourceRef().length()));
public BytesReference filterObjects() {
Source bytesSource = Source.fromBytes(sourceBytes);
return Source.fromMap(bytesSource.filter(fetchContext), bytesSource.sourceContentType()).internalSourceRef();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand Down Expand Up @@ -131,7 +131,7 @@ public BytesReference filterWithMap() throws IOException {
excludes = filters.toArray(Strings.EMPTY_ARRAY);
}
Map<String, Object> filterMap = XContentMapValues.filter(sourceMap, includes, excludes);
return FetchSourcePhase.objectToBytes(filterMap, XContentType.JSON, Math.min(1024, source.length()));
return Source.fromMap(filterMap, XContentType.JSON).internalSourceRef();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.search.lookup.Source;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,9 +85,9 @@ public void process(HitContext hit) throws IOException {
HitContext subContext = new HitContext(
new SearchHit(slot, "unknown", Collections.emptyMap(), Collections.emptyMap()),
percolatorLeafReaderContext,
slot
slot,
Source.fromBytes(document)
);
subContext.setSourceLookup(new SourceLookup(new SourceLookup.BytesSourceProvider(document)));
// force source because MemoryIndex does not store fields
SearchHighlightContext highlight = new SearchHighlightContext(fetchContext.highlight().fields(), true);
FetchSubPhaseProcessor processor = highlightPhase.getProcessor(fetchContext, highlight, query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase.HitContext;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.test.ESTestCase;

import java.util.Collections;
Expand All @@ -52,7 +53,7 @@ public void testHitsExecute() throws Exception {
LeafReaderContext context = reader.leaves().get(0);
// A match:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value", new WhitespaceAnalyzer());
Expand Down Expand Up @@ -80,7 +81,7 @@ public void testHitsExecute() throws Exception {

// No match:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value1", new WhitespaceAnalyzer());
Expand All @@ -107,7 +108,7 @@ public void testHitsExecute() throws Exception {

// No query:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
PercolateQuery.QueryStore queryStore = ctx -> docId -> null;
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value", new WhitespaceAnalyzer());
Expand Down
47 changes: 26 additions & 21 deletions server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand Down Expand Up @@ -42,6 +41,7 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -292,7 +293,8 @@ private static HitContext prepareNonNestedHitContext(

if (leafStoredFieldLoader.id() == null) {
SearchHit hit = new SearchHit(docId, null, null, null);
return new HitContext(hit, subReaderContext, subDocId);
Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId));
return new HitContext(hit, subReaderContext, subDocId, source);
} else {
SearchHit hit;
if (leafStoredFieldLoader.storedFields().isEmpty() == false) {
Expand All @@ -304,32 +306,37 @@ private static HitContext prepareNonNestedHitContext(
hit = new SearchHit(docId, leafStoredFieldLoader.id(), emptyMap(), emptyMap());
}

HitContext hitContext = new HitContext(hit, subReaderContext, subDocId);
BytesReference source;
Source source;
if (sourceRequired(context)) {
try {
profiler.startLoadingSource();
source = sourceLoader.source(leafStoredFieldLoader, subDocId);
source = Source.fromBytes(sourceLoader.source(leafStoredFieldLoader, subDocId));
SourceLookup scriptSourceLookup = context.getSearchExecutionContext().lookup().source();
scriptSourceLookup.setSegmentAndDocument(subReaderContext, subDocId);
scriptSourceLookup.setSourceProvider(new SourceLookup.BytesSourceProvider(source.internalSourceRef()));
} finally {
profiler.stopLoadingSource();
}
} else {
source = null;
source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId));
}
if (source != null) {
// Store the loaded source on the hit context so that fetch subphases can access it.
// Also make it available to scripts by storing it on the shared SearchLookup instance.
SourceLookup.BytesSourceProvider sourceBytes = new SourceLookup.BytesSourceProvider(source);
hitContext.setSourceLookup(new SourceLookup(sourceBytes));

SourceLookup scriptSourceLookup = context.getSearchExecutionContext().lookup().source();
scriptSourceLookup.setSegmentAndDocument(subReaderContext, subDocId);
scriptSourceLookup.setSourceProvider(sourceBytes);
}
return hitContext;
return new HitContext(hit, subReaderContext, subDocId, source);
}
}

private static Supplier<Source> lazyStoredSourceLoader(Profiler profiler, LeafReaderContext ctx, int doc) {
return () -> {
StoredFieldLoader rootLoader = profiler.storedFields(StoredFieldLoader.create(true, Collections.emptySet()));
LeafStoredFieldLoader leafRootLoader = rootLoader.getLoader(ctx, null);
try {
leafRootLoader.advanceTo(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return Source.fromBytes(leafRootLoader.source());
};
}

/**
* Resets the provided {@link HitContext} with information on the current
* nested document. This includes the following:
Expand Down Expand Up @@ -396,7 +403,6 @@ private static HitContext prepareNestedHitContext(
SearchHit.NestedIdentity nestedIdentity = nestedInfo.nestedIdentity();

SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity, docFields, metaFields);
HitContext hitContext = new HitContext(hit, subReaderContext, nestedInfo.doc());

if (rootSourceAsMap != null && rootSourceAsMap.isEmpty() == false) {
// Isolate the nested json array object that matches with nested hit and wrap it back into the same json
Expand All @@ -421,10 +427,9 @@ private static HitContext prepareNestedHitContext(
current = next;
}
}

hitContext.setSourceLookup(new SourceLookup(new SourceLookup.MapSourceProvider(nestedSourceAsMap, rootSourceContentType)));
return new HitContext(hit, subReaderContext, nestedInfo.doc(), Source.fromMap(nestedSourceAsMap, rootSourceContentType));
}
return hitContext;
return new HitContext(hit, subReaderContext, nestedInfo.doc(), Source.EMPTY);
}

public static List<Object> processStoredField(Function<String, MappedFieldType> fieldTypeLookup, String field, List<Object> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.index.ReaderUtil;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceLookup;

import java.io.IOException;

Expand All @@ -26,14 +25,13 @@ class HitContext {
private final SearchHit hit;
private final LeafReaderContext readerContext;
private final int docId;
private SourceLookup sourceLookup;
private final Source source;

public HitContext(SearchHit hit, LeafReaderContext context, int docId) {
public HitContext(SearchHit hit, LeafReaderContext context, int docId, Source source) {
this.hit = hit;
this.readerContext = context;
this.docId = docId;
this.sourceLookup = new SourceLookup(new SourceLookup.ReaderSourceProvider());
sourceLookup.setSegmentAndDocument(context, docId);
this.source = source;
}

public SearchHit hit() {
Expand Down Expand Up @@ -63,11 +61,7 @@ public int docId() {
* {@link FetchPhase}. This lookup will contain the preloaded source.
*/
public Source source() {
return sourceLookup;
}

public void setSourceLookup(SourceLookup source) {
this.sourceLookup = source;
return source;
}

public IndexReader topLevelReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,12 @@
package org.elasticsearch.search.fetch.subphase;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;

public final class FetchSourcePhase implements FetchSubPhase {
Expand Down Expand Up @@ -70,13 +64,7 @@ private void hitExecute(FetchSourceContext fetchSourceContext, HitContext hitCon
if (nestedHit) {
value = getNestedSource(value, hitContext);
}

try {
final int initialCapacity = nestedHit ? 1024 : Math.min(1024, source.internalSourceRef().length());
hitContext.hit().sourceRef(objectToBytes(value, source.sourceContentType(), initialCapacity));
} catch (IOException e) {
throw new ElasticsearchException("Error filtering source", e);
}
hitContext.hit().sourceRef(Source.fromMap(value, source.sourceContentType()).internalSourceRef());
}

@Override
Expand All @@ -90,23 +78,6 @@ private static boolean containsFilters(FetchSourceContext context) {
return context.includes().length != 0 || context.excludes().length != 0;
}

public static BytesReference objectToBytes(Object value, XContentType xContentType, int initialCapacity) throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
XContentBuilder builder = new XContentBuilder(xContentType.xContent(), streamOutput);
if (value != null) {
builder.value(value);
} else {
// This happens if the source filtering could not find the specified in the _source.
// Just doing `builder.value(null)` is valid, but the xcontent validation can't detect what format
// it is. In certain cases, for example response serialization we fail if no xcontent type can't be
// detected. So instead we just return an empty top level object. Also this is in inline with what was
// being return in this situation in 5.x and earlier.
builder.startObject();
builder.endObject();
}
return BytesReference.bytes(builder);
}

@SuppressWarnings("unchecked")
private static Map<String, Object> getNestedSource(Map<String, Object> sourceAsMap, HitContext hitContext) {
for (SearchHit.NestedIdentity o = hitContext.hit().getNestedIdentity(); o != null; o = o.getChild()) {
Expand Down

0 comments on commit 350338f

Please sign in to comment.