Skip to content
Merged
1 change: 1 addition & 0 deletions modules/percolator/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
requires org.apache.lucene.memory;
requires org.apache.lucene.queries;
requires org.apache.lucene.sandbox;
requires org.elasticsearch.logging;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
Expand All @@ -63,6 +64,11 @@
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
Expand All @@ -86,6 +92,8 @@
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBuilder> {
private static final Logger LOGGER = LogManager.getLogger(PercolateQueryBuilder.class);

public static final String NAME = "percolate";

static final ParseField DOCUMENT_FIELD = new ParseField("document");
Expand Down Expand Up @@ -557,41 +565,81 @@ static PercolateQuery.QueryStore createStore(MappedFieldType queryBuilderFieldTy
return docId -> {
if (binaryDocValues.advanceExact(docId)) {
BytesRef qbSource = binaryDocValues.binaryValue();
try (
InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length);
StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, qbSource.length), registry)
) {
// Query builder's content is stored via BinaryFieldMapper, which has a custom encoding
// to encode multiple binary values into a single binary doc values field.
// This is the reason we need to first read the number of values and
// then the length of the field value in bytes.
int numValues = input.readVInt();
assert numValues == 1;
int valueLength = input.readVInt();
assert valueLength > 0;

TransportVersion transportVersion;
if (indexVersion.before(IndexVersions.V_8_8_0)) {
transportVersion = TransportVersion.fromId(indexVersion.id());
} else {
transportVersion = TransportVersion.readVersion(input);
QueryBuilder queryBuilder = readQueryBuilder(qbSource, registry, indexVersion, () -> {
// query builder is written in an incompatible format, fall-back to reading it from source
if (context.isSourceEnabled() == false) {
throw new ElasticsearchException(
"Unable to read percolator query. Original transport version is incompatible and source is "
+ "unavailable on index [{}].",
context.index().getName()
);
}
// set the transportversion here - only read vints so far, so can change the version freely at this point
input.setTransportVersion(transportVersion);

QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
queryBuilder = Rewriteable.rewrite(queryBuilder, context);
return queryBuilder.toQuery(context);
}

LOGGER.warn(
"Reading percolator query from source. For best performance, reindexing of index [{}] is required.",
context.index().getName()
);
SourceProvider sourceProvider = context.createSourceProvider(new SourceFilter(null, null));
Source source = sourceProvider.getSource(ctx, docId);
SourceToParse sourceToParse = new SourceToParse(
String.valueOf(docId),
source.internalSourceRef(),
source.sourceContentType()
);

return context.parseDocument(sourceToParse).rootDoc().getBinaryValue(queryBuilderFieldType.name());
});

queryBuilder = Rewriteable.rewrite(queryBuilder, context);
return queryBuilder.toQuery(context);
} else {
return null;
}
};
};
}

private static QueryBuilder readQueryBuilder(
BytesRef bytesRef,
NamedWriteableRegistry registry,
IndexVersion indexVersion,
CheckedSupplier<BytesRef, IOException> fallbackSource
) throws IOException {
try (
InputStream in = new ByteArrayInputStream(bytesRef.bytes, bytesRef.offset, bytesRef.length);
StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, bytesRef.length), registry)
) {
// Query builder's content is stored via BinaryFieldMapper, which has a custom encoding
// to encode multiple binary values into a single binary doc values field.
// This is the reason we need to first read the number of values and
// then the length of the field value in bytes.
int numValues = input.readVInt();
assert numValues == 1;
int valueLength = input.readVInt();
assert valueLength > 0;

TransportVersion transportVersion;
if (indexVersion.before(IndexVersions.V_8_8_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume when we remove all versions prior to V_8_8_0 this if/else just becomes the else in a safe way now since it'll either reload from source or reject if there is no source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is IndexVersion so this won't go away until ES 10.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Oops! :)

transportVersion = TransportVersion.fromId(indexVersion.id());
} else {
transportVersion = TransportVersion.readVersion(input);
}

QueryBuilder queryBuilder;

if (TransportVersion.isCompatible(transportVersion) || fallbackSource == null) {
// set the transportversion here - only read vints so far, so can change the version freely at this point
input.setTransportVersion(transportVersion);
queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
} else {
// incompatible transport version, try the fallback
queryBuilder = readQueryBuilder(fallbackSource.get(), registry, indexVersion, null);
}

return queryBuilder;
}
}

static SearchExecutionContext wrap(SearchExecutionContext delegate) {
return new SearchExecutionContext(delegate) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,10 @@

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
Expand All @@ -37,33 +29,23 @@
import org.elasticsearch.index.query.SpanTermQueryBuilder;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.RandomScoreFunctionBuilder;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.xcontent.XContentBuilder;
import org.junit.ClassRule;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.ClusterState.VERSION_INTRODUCING_TRANSPORT_VERSIONS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

/**
* An integration test that tests whether percolator queries stored in older supported ES version can still be read by the
* current ES version. Percolator queries are stored in the binary format in a dedicated doc values field (see
* PercolatorFieldMapper#createQueryBuilderField(...) method). Using the query builders writable contract. This test
* does best effort verifying that we don't break bwc for query builders between the first previous major version and
* the latest current major release.
*
* The queries to test are specified in json format, which turns out to work because we tend break here rarely. If the
* json format of a query being tested here then feel free to change this.
* PercolatorFieldMapper#createQueryBuilderField(...) method). We don't attempt to assert anything on results here, simply executing
* a percolator query will force deserialization of the old query builder. This also verifies that our fallback compatibility
* functionality is working correctly, otherwise the search request will throw an exception.
*/
public class QueryBuilderBWCIT extends ParameterizedFullClusterRestartTestCase {
private static final List<Object[]> CANDIDATES = new ArrayList<>();
Expand Down Expand Up @@ -227,43 +209,20 @@ public void testQueryBuilderBWC() throws Exception {
assertEquals(201, rsp.getStatusLine().getStatusCode());
}
} else {
NamedWriteableRegistry registry = new NamedWriteableRegistry(
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables()
);

for (int i = 0; i < CANDIDATES.size(); i++) {
QueryBuilder expectedQueryBuilder = (QueryBuilder) CANDIDATES.get(i)[1];
Request request = new Request("GET", "/" + index + "/_search");
request.setJsonEntity(Strings.format("""
{"query": {"ids": {"values": ["%s"]}}, "docvalue_fields": [{"field":"query.query_builder_field"}]}
""", i));
Response rsp = client().performRequest(request);
assertEquals(200, rsp.getStatusLine().getStatusCode());
var hitRsp = (Map<?, ?>) ((List<?>) ((Map<?, ?>) responseAsMap(rsp).get("hits")).get("hits")).get(0);
String queryBuilderStr = (String) ((List<?>) ((Map<?, ?>) hitRsp.get("fields")).get("query.query_builder_field")).get(0);
byte[] qbSource = Base64.getDecoder().decode(queryBuilderStr);
try (
InputStream in = new ByteArrayInputStream(qbSource, 0, qbSource.length);
StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in), registry)
) {
@UpdateForV10(owner = UpdateForV10.Owner.SEARCH_FOUNDATIONS) // won't need to read <8.8 data anymore
boolean originalClusterHasTransportVersion = parseLegacyVersion(getOldClusterVersion()).map(
v -> v.onOrAfter(VERSION_INTRODUCING_TRANSPORT_VERSIONS)
).orElse(true);
TransportVersion transportVersion;
if (originalClusterHasTransportVersion == false) {
transportVersion = TransportVersion.fromId(
parseLegacyVersion(getOldClusterVersion()).map(Version::id).orElse(TransportVersion.minimumCompatible().id())
);
} else {
transportVersion = TransportVersion.readVersion(input);
Request request = new Request("GET", "/" + index + "/_search");
request.setJsonEntity("""
{
"query": {
"percolate": {
"field": "query",
"document": {
"foo": "bar"
}
}
input.setTransportVersion(transportVersion);
QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
assertEquals(expectedQueryBuilder, queryBuilder);
}
}
}
}""");
Response rsp = client().performRequest(request);
assertEquals(200, rsp.getStatusLine().getStatusCode());
}
}
}