Skip to content

Commit

Permalink
Use separate transportversion for percolator query serialization (#94517
Browse files Browse the repository at this point in the history
)

This changes the serialization format for queries - when the index version is >=8.8.0, it serializes the actual transport version used into the stream. For BwC with old query formats, it uses the mapped TransportVersion for the index version.

This can be modified later if needed to re-interpret the vint used to store TransportVersion to something else, allowing the format to be further modified if necessary.
  • Loading branch information
thecoop committed Apr 5, 2023
1 parent 7bd40f1 commit 9fa7612
Show file tree
Hide file tree
Showing 30 changed files with 232 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.benchmark.index.mapper;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -51,6 +52,7 @@ public static MapperService create(String mappings) {

SimilarityService similarityService = new SimilarityService(indexSettings, null, Map.of());
MapperService mapperService = new MapperService(
() -> TransportVersion.CURRENT,
indexSettings,
IndexAnalyzers.of(
Map.of("default", new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -169,6 +170,7 @@ protected final MapperService createMapperService(String mappings) {

SimilarityService similarityService = new SimilarityService(indexSettings, null, Map.of());
MapperService mapperService = new MapperService(
() -> TransportVersion.CURRENT,
indexSettings,
(type, name) -> Lucene.STANDARD_ANALYZER,
XContentParserConfiguration.EMPTY.withRegistry(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,28 +583,34 @@ static PercolateQuery.QueryStore createStore(MappedFieldType queryBuilderFieldTy
return docId -> {
if (binaryDocValues.advanceExact(docId)) {
BytesRef qbSource = binaryDocValues.binaryValue();
try (InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length)) {
try (
StreamInput input = new NamedWriteableAwareStreamInput(
new InputStreamStreamInput(in, qbSource.length),
registry
)
) {
input.setTransportVersion(indexVersion.transportVersion);
// Query builder's content is stored via BinaryFieldMapper, which has a custom encoding
// to encode multiple binary values into a single binary doc values field.
// This is the reason we need to first need to read the number of values and
// then the length of the field value in bytes.
int numValues = input.readVInt();
assert numValues == 1;
int valueLength = input.readVInt();
assert valueLength > 0;
QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
queryBuilder = Rewriteable.rewrite(queryBuilder, context);
return queryBuilder.toQuery(context);
try (
InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length);
StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, qbSource.length), registry)
) {
// Query builder's content is stored via BinaryFieldMapper, which has a custom encoding
// to encode multiple binary values into a single binary doc values field.
// This is the reason we need to first read the number of values and
// then the length of the field value in bytes.
int numValues = input.readVInt();
assert numValues == 1;
int valueLength = input.readVInt();
assert valueLength > 0;

TransportVersion transportVersion;
if (indexVersion.before(Version.V_8_8_0)) {
transportVersion = TransportVersion.fromId(indexVersion.id);
} else {
transportVersion = TransportVersion.readVersion(input);
}
// set the transportversion here - only read vints so far, so can change the version freely at this point
input.setTransportVersion(transportVersion);

QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
queryBuilder = Rewriteable.rewrite(queryBuilder, context);
return queryBuilder.toQuery(context);
}

} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.ParsingException;
Expand Down Expand Up @@ -96,7 +97,8 @@ public class PercolatorFieldMapper extends FieldMapper {

@Override
public FieldMapper.Builder getMergeBuilder() {
return new Builder(simpleName(), searchExecutionContext, mapUnmappedFieldsAsText, indexCreatedVersion).init(this);
return new Builder(simpleName(), searchExecutionContext, mapUnmappedFieldsAsText, indexCreatedVersion, clusterTransportVersion)
.init(this);
}

static class Builder extends FieldMapper.Builder {
Expand All @@ -107,17 +109,20 @@ static class Builder extends FieldMapper.Builder {
private final boolean mapUnmappedFieldsAsText;

private final Version indexCreatedVersion;
private final Supplier<TransportVersion> clusterTransportVersion;

Builder(
String fieldName,
Supplier<SearchExecutionContext> searchExecutionContext,
boolean mapUnmappedFieldsAsText,
Version indexCreatedVersion
Version indexCreatedVersion,
Supplier<TransportVersion> clusterTransportVersion
) {
super(fieldName);
this.searchExecutionContext = searchExecutionContext;
this.mapUnmappedFieldsAsText = mapUnmappedFieldsAsText;
this.indexCreatedVersion = Objects.requireNonNull(indexCreatedVersion);
this.clusterTransportVersion = clusterTransportVersion;
}

@Override
Expand Down Expand Up @@ -165,7 +170,8 @@ public PercolatorFieldMapper build(MapperBuilderContext context) {
rangeFieldMapper,
minimumShouldMatchFieldMapper,
mapUnmappedFieldsAsText,
indexCreatedVersion
indexCreatedVersion,
clusterTransportVersion
);
}

Expand Down Expand Up @@ -206,7 +212,8 @@ public Builder parse(String name, Map<String, Object> node, MappingParserContext
name,
parserContext.searchExecutionContext(),
getMapUnmappedFieldAsText(parserContext.getSettings()),
parserContext.indexVersionCreated()
parserContext.indexVersionCreated(),
parserContext.clusterTransportVersion()
);
}
}
Expand Down Expand Up @@ -354,6 +361,7 @@ Tuple<List<BytesRef>, Map<String, List<byte[]>>> extractTermsAndRanges(IndexRead
private final RangeFieldMapper rangeFieldMapper;
private final boolean mapUnmappedFieldsAsText;
private final Version indexCreatedVersion;
private final Supplier<TransportVersion> clusterTransportVersion;

PercolatorFieldMapper(
String simpleName,
Expand All @@ -367,7 +375,8 @@ Tuple<List<BytesRef>, Map<String, List<byte[]>>> extractTermsAndRanges(IndexRead
RangeFieldMapper rangeFieldMapper,
NumberFieldMapper minimumShouldMatchFieldMapper,
boolean mapUnmappedFieldsAsText,
Version indexCreatedVersion
Version indexCreatedVersion,
Supplier<TransportVersion> clusterTransportVersion
) {
super(simpleName, mappedFieldType, multiFields, copyTo);
this.searchExecutionContext = searchExecutionContext;
Expand All @@ -378,6 +387,7 @@ Tuple<List<BytesRef>, Map<String, List<byte[]>>> extractTermsAndRanges(IndexRead
this.rangeFieldMapper = rangeFieldMapper;
this.mapUnmappedFieldsAsText = mapUnmappedFieldsAsText;
this.indexCreatedVersion = indexCreatedVersion;
this.clusterTransportVersion = clusterTransportVersion;
}

@Override
Expand All @@ -399,7 +409,7 @@ public void parse(DocumentParserContext context) throws IOException {
queryBuilder = future.actionGet();

Version indexVersion = context.indexSettings().getIndexVersionCreated();
createQueryBuilderField(indexVersion, queryBuilderField, queryBuilder, context);
createQueryBuilderField(indexVersion, clusterTransportVersion.get(), queryBuilderField, queryBuilder, context);

QueryBuilder queryBuilderForProcessing = queryBuilder.rewrite(new SearchExecutionContext(executionContext));
Query query = queryBuilderForProcessing.toQuery(executionContext);
Expand Down Expand Up @@ -428,16 +438,27 @@ static QueryBuilder parseQueryBuilder(DocumentParserContext context) {

static void createQueryBuilderField(
Version indexVersion,
TransportVersion clusterTransportVersion,
BinaryFieldMapper qbField,
QueryBuilder queryBuilder,
DocumentParserContext context
) throws IOException {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(stream)) {
out.setTransportVersion(indexVersion.transportVersion);
out.writeNamedWriteable(queryBuilder);
qbField.indexValue(context, stream.toByteArray());
try (
ByteArrayOutputStream stream = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(stream)
) {
if (indexVersion.before(Version.V_8_8_0)) {
// just use the index version directly
// there's a direct mapping from Version to TransportVersion before 8.8.0
out.setTransportVersion(TransportVersion.fromId(indexVersion.id));
} else {
// write the version id to the stream first
TransportVersion.writeVersion(clusterTransportVersion, out);
out.setTransportVersion(clusterTransportVersion);
}

out.writeNamedWriteable(queryBuilder);
qbField.indexValue(context, stream.toByteArray());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.search.TermRangeQuery;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -822,6 +823,7 @@ private void assertQueryBuilder(BytesRef actual, QueryBuilder expected) throws I
// then the length of the field value in bytes.
input.readVInt();
input.readVInt();
TransportVersion.readVersion(input);
QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assertThat(queryBuilder, equalTo(expected));
}
Expand Down Expand Up @@ -885,6 +887,7 @@ public void testImplicitlySetDefaultScriptLang() throws Exception {
// then the length of the field value in bytes.
input.readVInt();
input.readVInt();
TransportVersion.readVersion(input);
ScriptQueryBuilder queryBuilder = (ScriptQueryBuilder) input.readNamedWriteable(QueryBuilder.class);
assertEquals(Script.DEFAULT_SCRIPT_LANG, queryBuilder.script().getLang());
}
Expand Down Expand Up @@ -927,6 +930,7 @@ public void testImplicitlySetDefaultScriptLang() throws Exception {
try (StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in), writableRegistry())) {
input.readVInt();
input.readVInt();
TransportVersion.readVersion(input);
FunctionScoreQueryBuilder queryBuilder = (FunctionScoreQueryBuilder) input.readNamedWriteable(QueryBuilder.class);
ScriptScoreFunctionBuilder function = (ScriptScoreFunctionBuilder) queryBuilder.filterFunctionBuilders()[0]
.getScoreFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -64,18 +65,23 @@ public void testStoringQueryBuilders() throws IOException {
BinaryFieldMapper fieldMapper = PercolatorFieldMapper.Builder.createQueryBuilderFieldBuilder(MapperBuilderContext.root(false));
MappedFieldType.FielddataOperation fielddataOperation = MappedFieldType.FielddataOperation.SEARCH;

Version version = Version.CURRENT;
try (IndexWriter indexWriter = new IndexWriter(directory, config)) {
for (int i = 0; i < queryBuilders.length; i++) {
queryBuilders[i] = new TermQueryBuilder(randomAlphaOfLength(4), randomAlphaOfLength(8));
DocumentParserContext documentParserContext = new TestDocumentParserContext();
PercolatorFieldMapper.createQueryBuilderField(version, fieldMapper, queryBuilders[i], documentParserContext);
PercolatorFieldMapper.createQueryBuilderField(
Version.CURRENT,
TransportVersion.CURRENT,
fieldMapper,
queryBuilders[i],
documentParserContext
);
indexWriter.addDocument(documentParserContext.doc());
}
}

SearchExecutionContext searchExecutionContext = mock(SearchExecutionContext.class);
when(searchExecutionContext.indexVersionCreated()).thenReturn(version);
when(searchExecutionContext.indexVersionCreated()).thenReturn(Version.CURRENT);
when(searchExecutionContext.getWriteableRegistry()).thenReturn(writableRegistry());
when(searchExecutionContext.getParserConfig()).thenReturn(parserConfig());
when(searchExecutionContext.getForField(fieldMapper.fieldType(), fielddataOperation)).thenReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -242,13 +244,24 @@ public void testQueryBuilderBWC() throws Exception {
var hitRsp = (Map<?, ?>) ((List<?>) ((Map<?, ?>) responseAsMap(rsp).get("hits")).get("hits")).get(0);
String queryBuilderStr = (String) ((List<?>) ((Map<?, ?>) hitRsp.get("fields")).get("query.query_builder_field")).get(0);
byte[] qbSource = Base64.getDecoder().decode(queryBuilderStr);
try (InputStream in = new ByteArrayInputStream(qbSource, 0, qbSource.length)) {
try (StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in), registry)) {
input.setTransportVersion(getOldClusterVersion().transportVersion);
QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
assertEquals(expectedQueryBuilder, queryBuilder);
try (
InputStream in = new ByteArrayInputStream(qbSource, 0, qbSource.length);
StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in), registry)
) {
Version clusterVersion = getOldClusterVersion();

TransportVersion transportVersion;
if (clusterVersion.before(Version.V_8_8_0)) {
transportVersion = TransportVersion.fromId(clusterVersion.id);
} else {
transportVersion = TransportVersion.readVersion(input);
}

input.setTransportVersion(transportVersion);
QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
assert in.read() == -1;
assertEquals(expectedQueryBuilder, queryBuilder);

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -50,19 +51,22 @@ public class IndexMetadataVerifier {
private static final Logger logger = LogManager.getLogger(IndexMetadataVerifier.class);

private final Settings settings;
private final ClusterService clusterService;
private final XContentParserConfiguration parserConfiguration;
private final MapperRegistry mapperRegistry;
private final IndexScopedSettings indexScopedSettings;
private final ScriptCompiler scriptService;

public IndexMetadataVerifier(
Settings settings,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
MapperRegistry mapperRegistry,
IndexScopedSettings indexScopedSettings,
ScriptCompiler scriptCompiler
) {
this.settings = settings;
this.clusterService = clusterService;
this.parserConfiguration = XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry)
.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
this.mapperRegistry = mapperRegistry;
Expand Down Expand Up @@ -168,6 +172,7 @@ protected TokenStreamComponents createComponents(String fieldName) {

try (
MapperService mapperService = new MapperService(
clusterService,
indexSettings,
(type, name) -> new NamedAnalyzer(name, AnalyzerScope.INDEX, fakeDefault.analyzer()),
parserConfiguration,
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,13 @@ private static IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier
* doing so will result in an exception.
*/
public MapperService newIndexMapperService(
ClusterService clusterService,
XContentParserConfiguration parserConfiguration,
MapperRegistry mapperRegistry,
ScriptService scriptService
) throws IOException {
return new MapperService(
clusterService,
indexSettings,
analysisRegistry.build(indexSettings),
parserConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public IndexService(
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(
clusterService,
indexSettings,
indexAnalyzers,
parserConfiguration,
Expand Down

0 comments on commit 9fa7612

Please sign in to comment.