Skip to content

Commit

Permalink
Faster Mapping Parsing on Master and Datanode (#82382)
Browse files Browse the repository at this point in the history
A couple of straightforward mapper parsing improvements.
Save roundtrips to `String` in many spots.
Don't serialize the mapping again in the document mapper constructor
when the mapping is known to be correct (free of redundant whitespaces etc.)
because we received it from the master/CS.

relates #77466
  • Loading branch information
original-brownbear committed Jan 11, 2022
1 parent dfb9f6f commit 95a124c
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpda
private final CompressedXContent source;

public PutMappingClusterStateUpdateRequest(String source) throws IOException {
this.source = new CompressedXContent(source);
this.source = CompressedXContent.fromJSON(source);
}

public CompressedXContent source() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ public static Template resolveTemplate(
Map<String, AliasMetadata> aliasesByName = aliases.stream().collect(Collectors.toMap(AliasMetadata::getAlias, Function.identity()));

// empty request mapping as the user can't specify any explicit mappings via the simulate api
List<Map<String, Object>> mappings = MetadataCreateIndexService.collectV2Mappings(
"{}",
List<CompressedXContent> mappings = MetadataCreateIndexService.collectV2Mappings(
null,
simulatedState,
matchingTemplate,
xContentRegistry,
Expand All @@ -265,10 +265,8 @@ public static Template resolveTemplate(
indexMetadata,
tempIndexService -> {
MapperService mapperService = tempIndexService.mapperService();
for (Map<String, Object> mapping : mappings) {
if (mapping.isEmpty() == false) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MapperService.MergeReason.INDEX_TEMPLATE);
}
for (CompressedXContent mapping : mappings) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MapperService.MergeReason.INDEX_TEMPLATE);
}

DocumentMapper documentMapper = mapperService.documentMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -33,7 +32,10 @@
*/
public class MappingMetadata extends AbstractDiffable<MappingMetadata> {

public static final MappingMetadata EMPTY_MAPPINGS = new MappingMetadata("_doc", Collections.emptyMap());
public static final MappingMetadata EMPTY_MAPPINGS = new MappingMetadata(
MapperService.SINGLE_MAPPING_NAME,
Map.of(MapperService.SINGLE_MAPPING_NAME, Map.of())
);

private final String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private ClusterState applyCreateIndexWithTemporaryService(
final boolean silent,
final IndexMetadata sourceMetadata,
final IndexMetadata temporaryIndexMeta,
final List<Map<String, Object>> mappings,
final List<CompressedXContent> mappings,
final Function<IndexService, List<AliasMetadata>> aliasSupplier,
final List<String> templatesApplied,
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer
Expand Down Expand Up @@ -518,14 +518,19 @@ private ClusterState applyCreateIndexRequestWithV1Templates(
templates.stream().map(IndexTemplateMetadata::name).collect(Collectors.toList())
);

final Map<String, Object> mappings = Collections.unmodifiableMap(
parseV1Mappings(
request.mappings(),
templates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
xContentRegistry
)
final Map<String, Object> mappingsMap = parseV1Mappings(
request.mappings(),
templates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
xContentRegistry
);

final CompressedXContent mappings;
if (mappingsMap.isEmpty()) {
mappings = null;
} else {
mappings = new CompressedXContent((builder, params) -> builder.mapContents(mappingsMap));
}

final Settings aggregatedIndexSettings = aggregateIndexSettings(
currentState,
request,
Expand All @@ -545,7 +550,7 @@ private ClusterState applyCreateIndexRequestWithV1Templates(
silent,
null,
tmpImd,
List.of(mappings),
mappings == null ? List.of() : List.of(mappings),
indexService -> resolveAndValidateAliases(
request.index(),
request.aliases(),
Expand Down Expand Up @@ -586,7 +591,7 @@ private ClusterState applyCreateIndexRequestWithV2Template(
);
}

final List<Map<String, Object>> mappings = collectV2Mappings(
final List<CompressedXContent> mappings = collectV2Mappings(
request.mappings(),
currentState,
templateName,
Expand Down Expand Up @@ -648,7 +653,7 @@ private ClusterState applyCreateIndexRequestForSystemDataStream(
}

final Map<String, ComponentTemplate> componentTemplates = request.systemDataStreamDescriptor().getComponentTemplates();
final List<Map<String, Object>> mappings = collectSystemV2Mappings(template, componentTemplates, xContentRegistry, request.index());
final List<CompressedXContent> mappings = collectSystemV2Mappings(template, componentTemplates, xContentRegistry, request.index());

final Settings aggregatedIndexSettings = aggregateIndexSettings(
currentState,
Expand Down Expand Up @@ -688,7 +693,7 @@ private ClusterState applyCreateIndexRequestForSystemDataStream(
);
}

private static List<Map<String, Object>> collectSystemV2Mappings(
private static List<CompressedXContent> collectSystemV2Mappings(
final ComposableIndexTemplate composableIndexTemplate,
final Map<String, ComponentTemplate> componentTemplates,
final NamedXContentRegistry xContentRegistry,
Expand All @@ -699,11 +704,11 @@ private static List<Map<String, Object>> collectSystemV2Mappings(
componentTemplates,
indexName
);
return collectV2Mappings("{}", templateMappings, xContentRegistry);
return collectV2Mappings(null, templateMappings, xContentRegistry);
}

public static List<Map<String, Object>> collectV2Mappings(
final String requestMappings,
public static List<CompressedXContent> collectV2Mappings(
@Nullable final String requestMappings,
final ClusterState currentState,
final String templateName,
final NamedXContentRegistry xContentRegistry,
Expand All @@ -713,20 +718,19 @@ public static List<Map<String, Object>> collectV2Mappings(
return collectV2Mappings(requestMappings, templateMappings, xContentRegistry);
}

public static List<Map<String, Object>> collectV2Mappings(
final String requestMappings,
private static List<CompressedXContent> collectV2Mappings(
@Nullable final String requestMappings,
final List<CompressedXContent> templateMappings,
final NamedXContentRegistry xContentRegistry
) throws Exception {
List<Map<String, Object>> result = new ArrayList<>();

for (CompressedXContent templateMapping : templateMappings) {
Map<String, Object> parsedTemplateMapping = MapperService.parseMapping(xContentRegistry, templateMapping);
result.add(parsedTemplateMapping);
List<CompressedXContent> result = new ArrayList<>(templateMappings.size() + 1);
result.addAll(templateMappings);
if (requestMappings != null) {
Map<String, Object> parsedRequestMappings = MapperService.parseMapping(xContentRegistry, requestMappings);
if (parsedRequestMappings.isEmpty() == false) {
result.add(new CompressedXContent((builder, params) -> builder.mapContents(parsedRequestMappings)));
}
}

Map<String, Object> parsedRequestMappings = MapperService.parseMapping(xContentRegistry, requestMappings);
result.add(parsedRequestMappings);
return result;
}

Expand Down Expand Up @@ -765,7 +769,7 @@ private ClusterState applyCreateIndexRequestWithExistingMetadata(
silent,
sourceMetadata,
tmpImd,
List.of(mappings),
List.of(),
indexService -> resolveAndValidateAliases(
request.index(),
request.aliases(),
Expand Down Expand Up @@ -797,7 +801,7 @@ static Map<String, Object> parseV1Mappings(
String mappingsJson,
List<CompressedXContent> templateMappings,
NamedXContentRegistry xContentRegistry
) throws Exception {
) throws IOException {
Map<String, Object> mappings = MapperService.parseMapping(xContentRegistry, mappingsJson);
// apply templates, merging the mappings into the request mapping if exists
for (CompressedXContent mapping : templateMappings) {
Expand Down Expand Up @@ -1211,18 +1215,17 @@ private static ClusterBlocks.Builder createClusterBlocksBuilder(ClusterState cur
private void updateIndexMappingsAndBuildSortOrder(
IndexService indexService,
CreateIndexClusterStateUpdateRequest request,
List<Map<String, Object>> mappings,
List<CompressedXContent> mappings,
@Nullable IndexMetadata sourceMetadata
) throws IOException {
MapperService mapperService = indexService.mapperService();
IndexMode indexMode = indexService.getIndexSettings() != null ? indexService.getIndexSettings().getMode() : IndexMode.STANDARD;
List<Map<String, Object>> mergedMappings = new ArrayList<>(1 + mappings.size());
mergedMappings.add(indexMode.getDefaultMapping());
mergedMappings.addAll(mappings);
for (Map<String, Object> mapping : mergedMappings) {
if (mapping.isEmpty() == false) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MergeReason.INDEX_TEMPLATE);
}
final CompressedXContent defaultMapping = indexMode.getDefaultMapping();
if (defaultMapping != null) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, defaultMapping, MergeReason.INDEX_TEMPLATE);
}
for (CompressedXContent mapping : mappings) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MergeReason.INDEX_TEMPLATE);
}
indexMode.validateTimestampFieldMapping(request.dataStreamName() != null, mapperService.mappingLookup());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ ClusterState addComponentTemplate(
return currentState;
}

validateTemplate(finalSettings, wrappedMappings, indicesService, xContentRegistry);
validateTemplate(finalSettings, wrappedMappings, indicesService);
validate(name, finalComponentTemplate);

// Validate all composable index templates that use this component template
Expand Down Expand Up @@ -959,7 +959,7 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
validateTemplate(request.settings, request.mappings, indicesService, xContentRegistry);
validateTemplate(request.settings, request.mappings, indicesService);
return innerPutTemplate(currentState, request, templateBuilder);
}

Expand Down Expand Up @@ -1429,12 +1429,8 @@ private static void validateCompositeTemplate(
});
}

private static void validateTemplate(
Settings validateSettings,
CompressedXContent mappings,
IndicesService indicesService,
NamedXContentRegistry xContentRegistry
) throws Exception {
private static void validateTemplate(Settings validateSettings, CompressedXContent mappings, IndicesService indicesService)
throws Exception {
// Hard to validate settings if they're non-existent, so used empty ones if none were provided
Settings settings = validateSettings;
if (settings == null) {
Expand Down Expand Up @@ -1466,12 +1462,7 @@ private static void validateTemplate(
createdIndex = dummyIndexService.index();

if (mappings != null) {
dummyIndexService.mapperService()
.merge(
MapperService.SINGLE_MAPPING_NAME,
MapperService.parseMapping(xContentRegistry, mappings),
MergeReason.MAPPING_UPDATE
);
dummyIndexService.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, mappings, MergeReason.MAPPING_UPDATE);
}

} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private ClusterState applyRequest(
// update mapping metadata on all types
DocumentMapper mapper = mapperService.documentMapper();
if (mapper != null) {
indexMetadataBuilder.putMapping(new MappingMetadata(mapper.mappingSource()));
indexMetadataBuilder.putMapping(new MappingMetadata(mapper));
}
if (updatedMapping) {
indexMetadataBuilder.mappingVersion(1 + indexMetadataBuilder.mappingVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
Expand All @@ -34,7 +35,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -45,6 +45,18 @@ public class MetadataMigrateToDataStreamService {

private static final Logger logger = LogManager.getLogger(MetadataMigrateToDataStreamService.class);

private static final CompressedXContent TIMESTAMP_MAPPING;

static {
try {
TIMESTAMP_MAPPING = new CompressedXContent(
((builder, params) -> builder.startObject(DataStreamTimestampFieldMapper.NAME).field("enabled", true).endObject())
);
} catch (IOException e) {
throw new AssertionError(e);
}
}

private final ClusterService clusterService;
private final ActiveShardsObserver activeShardsObserver;
private final IndicesService indexServices;
Expand Down Expand Up @@ -169,11 +181,7 @@ static void prepareBackingIndex(

MapperService mapperService = mapperSupplier.apply(im);
mapperService.merge(im, MapperService.MergeReason.MAPPING_RECOVERY);
mapperService.merge(
"_doc",
Map.of(DataStreamTimestampFieldMapper.NAME, Map.of("enabled", true)),
MapperService.MergeReason.MAPPING_UPDATE
);
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, TIMESTAMP_MAPPING, MapperService.MergeReason.MAPPING_UPDATE);
DocumentMapper mapper = mapperService.documentMapper();

var imb = IndexMetadata.builder(im);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -135,6 +137,27 @@ public CompressedXContent(byte[] data) throws IOException {
this(new BytesArray(data));
}

/**
* Parses the given JSON string and then serializes it back in compressed form without any whitespaces. This is used to normalize
* mapping json strings for deduplication.
*
* @param json string containing valid JSON
* @return compressed x-content normalized to not contain any whitespaces
*/
public static CompressedXContent fromJSON(String json) throws IOException {
return new CompressedXContent(new ToXContent() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.copyCurrentStructure(JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json));
}

@Override
public boolean isFragment() {
return false;
}
});
}

public CompressedXContent(String str) throws IOException {
this(new BytesArray(str.getBytes(StandardCharsets.UTF_8)));
}
Expand Down

0 comments on commit 95a124c

Please sign in to comment.