Skip to content

Commit

Permalink
Add a stored fields loader (#89657)
Browse files Browse the repository at this point in the history
To load stored fields we currently directly use lucene's FieldVisitor API. There are
a number of nice shortcuts and some dodgy hacks that get better performance here,
which need to be reproduced everywhere the code uses this API. It is also confusingly
stateful, and not especially easy to use.

This commit adds a new StoredFieldLoader abstraction which exposes a per-leaf stored
field loader as an iterator, and adapts ShardGetService and FetchPhase to use this new
abstraction in place of field visitors. Further work to integrate source loaders will be
done in followup commits.
  • Loading branch information
romseygeek committed Aug 30, 2022
1 parent e891909 commit ce63ee5
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public Set<String> getFieldNames() {

@Override
public Status needsField(FieldInfo fieldInfo) {
if (fields.isEmpty()) {
return super.needsField(fieldInfo);
}
if (super.needsField(fieldInfo) == Status.YES) {
return Status.YES;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.fieldvisitor;

import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Loads stored fields for a LeafReader
*
* Which stored fields to load will be configured by the loader's parent
* {@link StoredFieldLoader}
*/
public interface LeafStoredFieldLoader {

/**
* Advance the reader to a document. This should be idempotent.
*/
void advanceTo(int doc) throws IOException;

/**
* @return the source for the current document
*/
BytesReference source();

/**
* @return the ID for the current document
*/
String id();

/**
* @return the routing path for the current document
*/
String routing();

/**
* @return stored fields for the current document
*/
Map<String, List<Object>> storedFields();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.fieldvisitor;

import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Generates a {@link LeafStoredFieldLoader} for a given lucene segment to load stored fields.
*/
public abstract class StoredFieldLoader {

/**
* Return a {@link LeafStoredFieldLoader} for the given segment and document set
*
* The loader will use an internal lucene merge reader if the document set is of
* sufficient size and is contiguous. Callers may pass {@code null} if the set
* is not known up front or if the merge reader optimisation will not apply.
*/
public abstract LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs);

/**
* @return a list of fields that will be loaded for each document
*/
public abstract List<String> fieldsToLoad();

/**
* Creates a new StoredFieldLoader
* @param loadSource should this loader load the _source field
* @param fields a set of additional fields the loader should load
*/
public static StoredFieldLoader create(boolean loadSource, Set<String> fields) {
List<String> fieldsToLoad = fieldsToLoad(loadSource, fields);
return new StoredFieldLoader() {
@Override
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) {
return new ReaderStoredFieldLoader(reader(ctx, docs), loadSource, fields);
}

@Override
public List<String> fieldsToLoad() {
return fieldsToLoad;
}
};
}

/**
* Creates a no-op StoredFieldLoader that will not load any fields from disk
*/
public static StoredFieldLoader empty() {
return new StoredFieldLoader() {
@Override
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) {
return new EmptyStoredFieldLoader();
}

@Override
public List<String> fieldsToLoad() {
return List.of();
}
};
}

private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(LeafReaderContext ctx, int[] docs) {
LeafReader leafReader = ctx.reader();
if (docs == null) {
return leafReader::document;
}
if (leafReader instanceof SequentialStoredFieldsLeafReader lf && docs.length > 10 && hasSequentialDocs(docs)) {
return lf.getSequentialStoredFieldsReader()::visitDocument;
}
return leafReader::document;
}

private static List<String> fieldsToLoad(boolean loadSource, Set<String> fields) {
Set<String> fieldsToLoad = new HashSet<>();
fieldsToLoad.add("_id");
fieldsToLoad.add("_routing");
if (loadSource) {
fieldsToLoad.add("_source");
}
fieldsToLoad.addAll(fields);
return fieldsToLoad.stream().sorted().toList();
}

private static boolean hasSequentialDocs(int[] docs) {
return docs.length > 0 && docs[docs.length - 1] - docs[0] == docs.length - 1;
}

private static class EmptyStoredFieldLoader implements LeafStoredFieldLoader {

@Override
public void advanceTo(int doc) throws IOException {}

@Override
public BytesReference source() {
return null;
}

@Override
public String id() {
return null;
}

@Override
public String routing() {
return null;
}

@Override
public Map<String, List<Object>> storedFields() {
return Collections.emptyMap();
}
}

private static class ReaderStoredFieldLoader implements LeafStoredFieldLoader {

private final CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader;
private final CustomFieldsVisitor visitor;
private int doc = -1;

ReaderStoredFieldLoader(CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader, boolean loadSource, Set<String> fields) {
this.reader = reader;
this.visitor = new CustomFieldsVisitor(fields, loadSource);
}

@Override
public void advanceTo(int doc) throws IOException {
if (doc != this.doc) {
visitor.reset();
reader.accept(doc, visitor);
this.doc = doc;
}
}

@Override
public BytesReference source() {
return visitor.source();
}

@Override
public String id() {
return visitor.id();
}

@Override
public String routing() {
return visitor.routing();
}

@Override
public Map<String, List<Object>> storedFields() {
return visitor.fields();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
Expand All @@ -30,6 +30,7 @@
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
Expand Down Expand Up @@ -240,34 +241,34 @@ private GetResult innerGetFetch(

Map<String, DocumentField> documentFields = null;
Map<String, DocumentField> metadataFields = null;
BytesReference source = null;
BytesReference source;
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
SourceLoader loader = forceSyntheticSource
? new SourceLoader.Synthetic(mappingLookup.getMapping())
: mappingLookup.newSourceLoader();
FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext, loader);
if (fieldVisitor != null) {
try {
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" + id + "]", e);
}
StoredFieldLoader storedFieldLoader = buildStoredFieldLoader(storedFields, fetchSourceContext, loader);
LeafStoredFieldLoader leafStoredFieldLoader = storedFieldLoader.getLoader(docIdAndVersion.reader.getContext(), null);
try {
leafStoredFieldLoader.advanceTo(docIdAndVersion.docId);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" + id + "]", e);
}

// put stored fields into result objects
if (fieldVisitor.fields().isEmpty() == false) {
fieldVisitor.postProcess(mapperService::fieldType);
documentFields = new HashMap<>();
metadataFields = new HashMap<>();
for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
if (mapperService.isMetadataField(entry.getKey())) {
metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
} else {
documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
}
// put stored fields into result objects
if (leafStoredFieldLoader.storedFields().isEmpty() == false) {
documentFields = new HashMap<>();
metadataFields = new HashMap<>();
for (Map.Entry<String, List<Object>> entry : leafStoredFieldLoader.storedFields().entrySet()) {
List<Object> values = FetchPhase.processStoredField(mapperService::fieldType, entry.getKey(), entry.getValue());
if (mapperService.isMetadataField(entry.getKey())) {
metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), values));
} else {
documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), values));
}
}
source = loader.leaf(docIdAndVersion.reader, new int[] { docIdAndVersion.docId }).source(fieldVisitor, docIdAndVersion.docId);
}
source = loader.leaf(docIdAndVersion.reader, new int[] { docIdAndVersion.docId })
.source(leafStoredFieldLoader, docIdAndVersion.docId);

if (source != null) {
// apply request-level source filtering
Expand Down Expand Up @@ -296,19 +297,18 @@ private GetResult innerGetFetch(
);
}

private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext, SourceLoader loader) {
private static StoredFieldLoader buildStoredFieldLoader(String[] fields, FetchSourceContext fetchSourceContext, SourceLoader loader) {
Set<String> fieldsToLoad = new HashSet<>();
if (fields != null && fields.length > 0) {
Set<String> fieldsToLoad = new HashSet<>();
Collections.addAll(fieldsToLoad, fields);
if (fetchSourceContext.fetchSource()) {
fieldsToLoad.addAll(loader.requiredStoredFields());
}
return new CustomFieldsVisitor(fieldsToLoad, fetchSourceContext.fetchSource());
}
Set<String> sourceFields = fetchSourceContext.fetchSource() ? loader.requiredStoredFields() : Set.of();
if (sourceFields.isEmpty()) {
return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;
if (fetchSourceContext.fetchSource()) {
fieldsToLoad.addAll(loader.requiredStoredFields());
} else {
if (fieldsToLoad.isEmpty()) {
return StoredFieldLoader.empty();
}
}
return new CustomFieldsVisitor(sourceFields, fetchSourceContext.fetchSource());
return StoredFieldLoader.create(fetchSourceContext.fetchSource(), fieldsToLoad);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,12 @@ protected SourceLoader.SyntheticFieldLoader syntheticFieldLoader(String simpleNa
);
}
if (fieldType.stored()) {
return new StringStoredFieldFieldLoader(name(), simpleName);
return new StringStoredFieldFieldLoader(name(), simpleName) {
@Override
public void load(List<Object> values) {
super.load(values.stream().map(fieldType()::valueForDisplay).toList());
}
};
}
if (hasDocValues == false) {
throw new IllegalArgumentException(
Expand Down

0 comments on commit ce63ee5

Please sign in to comment.