Skip to content

Commit

Permalink
Encapsulate source filtering (#91127)
Browse files Browse the repository at this point in the history
We have two implementations of source filtering, one based on Map filtering
and used by SourceLookup, and one based on jackson stream filtering used
in Get and SourceFieldMapper. There are cases when stream filtering could
be usefully applied to source in the fetch phase, for example if the source is
not being used as a Map by any other subphase; and correspondingly if a
source has already been parsed to a Map then map filtering will generally
be more efficient than stream filtering that ends up re-parsing the bytes.

This commit encapsulates all of this filtering logic into a single SourceFilter
class, which can be passed to the filter method on Source. Different
Source implementations can choose to use map or stream filtering
depending on whether or not they have map or bytes representations
available.
  • Loading branch information
romseygeek committed Oct 31, 2022
1 parent 90ad248 commit ba7a219
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ private BytesReference buildBigExample(String extraText) throws IOException {
}

@Benchmark
public BytesReference filterObjects() {
public BytesReference filterSourceMap() {
Source bytesSource = Source.fromBytes(sourceBytes);
return Source.fromMap(bytesSource.filter(fetchContext), bytesSource.sourceContentType()).internalSourceRef();
return fetchContext.filter().filterMap(bytesSource).internalSourceRef();
}

@Benchmark
public BytesReference filterSourceBytes() {
Source bytesSource = Source.fromBytes(sourceBytes);
return fetchContext.filter().filterBytes(bytesSource).internalSourceRef();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.benchmark.search.fetch.subphase;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceFilter;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@Fork(1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class SourceFilteringBenchmark {

private BytesReference sourceBytes;
private SourceFilter filter;

@Param({ "tiny", "short", "one_4k_field", "one_4m_field" })
private String source;
@Param({ "message" })
private String includes;
@Param({ "" })
private String excludes;

@Setup
public void setup() throws IOException {
sourceBytes = switch (source) {
case "tiny" -> new BytesArray("{\"message\": \"short\"}");
case "short" -> read300BytesExample();
case "one_4k_field" -> buildBigExample("huge".repeat(1024));
case "one_4m_field" -> buildBigExample("huge".repeat(1024 * 1024));
default -> throw new IllegalArgumentException("Unknown source [" + source + "]");
};
FetchSourceContext fetchContext = FetchSourceContext.of(
true,
Strings.splitStringByCommaToArray(includes),
Strings.splitStringByCommaToArray(excludes)
);
filter = fetchContext.filter();
}

private BytesReference read300BytesExample() throws IOException {
return Streams.readFully(FetchSourcePhaseBenchmark.class.getResourceAsStream("300b_example.json"));
}

private BytesReference buildBigExample(String extraText) throws IOException {
String bigger = read300BytesExample().utf8ToString();
bigger = "{\"huge\": \"" + extraText + "\"," + bigger.substring(1);
return new BytesArray(bigger);
}

// We want to compare map filtering with bytes filtering when the map has already
// been parsed.

@Benchmark
public Source filterMap() {
Source source = Source.fromBytes(sourceBytes);
source.source(); // build map
return filter.filterMap(source);
}

@Benchmark
public Source filterBytes() {
Source source = Source.fromBytes(sourceBytes);
source.source(); // build map
return filter.filterBytes(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testHitsExecute() throws Exception {
LeafReaderContext context = reader.leaves().get(0);
// A match:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value", new WhitespaceAnalyzer());
Expand Down Expand Up @@ -86,7 +86,7 @@ public void testHitsExecute() throws Exception {

// No match:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value1", new WhitespaceAnalyzer());
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testHitsExecute() throws Exception {

// No query:
{
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
PercolateQuery.QueryStore queryStore = ctx -> docId -> null;
MemoryIndex memoryIndex = new MemoryIndex();
memoryIndex.addField("field", "value", new WhitespaceAnalyzer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
Expand All @@ -34,8 +32,7 @@
import org.elasticsearch.script.UpdateCtxMap;
import org.elasticsearch.script.UpdateScript;
import org.elasticsearch.script.UpsertCtxMap;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
Expand Down Expand Up @@ -315,6 +312,7 @@ private <T extends UpdateCtxMap> T executeScript(Script script, T ctxMap) {

/**
* Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
* // TODO can we pass a Source here rather than Map, XcontentType and BytesReference?
*/
public static GetResult extractGetResult(
final UpdateRequest request,
Expand All @@ -329,21 +327,9 @@ public static GetResult extractGetResult(
if (request.fetchSource() == null || request.fetchSource().fetchSource() == false) {
return null;
}

BytesReference sourceFilteredAsBytes = sourceAsBytes;
if (request.fetchSource().includes().length > 0 || request.fetchSource().excludes().length > 0) {
SourceLookup sourceLookup = new SourceLookup(new SourceLookup.MapSourceProvider(source));
Object value = sourceLookup.filter(request.fetchSource());
try {
final int initialCapacity = sourceAsBytes != null ? Math.min(1024, sourceAsBytes.length()) : 1024;
BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
try (XContentBuilder builder = new XContentBuilder(sourceContentType.xContent(), streamOutput)) {
builder.value(value);
sourceFilteredAsBytes = BytesReference.bytes(builder);
}
} catch (IOException e) {
throw new ElasticsearchException("Error filtering source", e);
}
if (request.fetchSource().hasFilter()) {
sourceFilteredAsBytes = Source.fromMap(source, sourceContentType).filter(request.fetchSource().filter()).internalSourceRef();
}

// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ private static Object extractValue(String[] pathElements, int index, Object curr
* document contains {@code a.b} as a property and {@code a} is an include,
* then {@code a.b} will be kept in the filtered map.
*/
public static Map<String, Object> filter(Map<String, ?> map, String[] includes, String[] excludes) {
public static Map<String, Object> filter(Map<String, Object> map, String[] includes, String[] excludes) {
return filter(includes, excludes).apply(map);
}

/**
* Returns a function that filters a document map based on the given include and exclude rules.
* @see #filter(Map, String[], String[]) for details
*/
public static Function<Map<String, ?>, Map<String, Object>> filter(String[] includes, String[] excludes) {
public static Function<Map<String, Object>, Map<String, Object>> filter(String[] includes, String[] excludes) {
CharacterRunAutomaton matchAllAutomaton = new CharacterRunAutomaton(Automata.makeAnyString());

CharacterRunAutomaton include;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.xcontent.XContentFieldFilter;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
Expand All @@ -32,6 +31,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.Source;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -282,13 +282,8 @@ private GetResult innerGetFetch(
// apply request-level source filtering
if (fetchSourceContext.fetchSource() == false) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
try {
source = XContentFieldFilter.newFieldFilter(fetchSourceContext.includes(), fetchSourceContext.excludes())
.apply(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
}
} else if (fetchSourceContext.hasFilter()) {
source = Source.fromBytes(source).filter(fetchSourceContext.filter()).internalSourceRef();
}
}

Expand Down

0 comments on commit ba7a219

Please sign in to comment.