Skip to content

Commit

Permalink
[ML] Copy more settings when creating DF analytics destination index (#…
Browse files Browse the repository at this point in the history
…91546)

Currently, when a data frame analytics job is created, just two settings from the source index are copied to the auto-created destination index - index.number_of_shards and index.number_of_replicas.

To cater for slightly more complex source indices this PR makes changes to also copy/merge additional settings from the source indices to the destination index - index.analysis, index.similarity and index.mapping.

In the case of the index.mapping settings, when multiple source indices are involved, the settings are merged in a similar manner as for index.number_of_shards & index.number_of_replicas, i.e. by taking the maximum value of the setting across all source indices.

For index.similarity, when merging multiple indices, the similarity objects must be identical else an exception is thrown.

index.analysis is comprised of sub-objects index.analysis.filter and index.analysis.analyzer, which may in turn be comprised of multiple filter and analyzer objects. The merge procedure here is to throw an exception if identically named objects differ in content, else all filter and analyzer objects are copied over to the destination index.
  • Loading branch information
edsavage committed Nov 14, 2022
1 parent b5226ef commit fc5c1f1
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 32 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/91546.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91546
summary: Copy more settings when creating DF analytics destination index
area: Machine Learning
type: bug
issues:
- 89795
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
public final class AnalysisRegistry implements Closeable {
public static final String INDEX_ANALYSIS_CHAR_FILTER = "index.analysis.char_filter";
public static final String INDEX_ANALYSIS_FILTER = "index.analysis.filter";
public static final String INDEX_ANALYSIS_ANALYZER = "index.analysis.analyzer";
public static final String INDEX_ANALYSIS_TOKENIZER = "index.analysis.tokenizer";

public static final String DEFAULT_ANALYZER_NAME = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
Expand All @@ -38,10 +42,12 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -79,7 +85,12 @@ public final class DestinationIndex {
* If the user needs other settings on the destination index they
* should create the destination index before starting the analytics.
*/
private static final String[] PRESERVED_SETTINGS = new String[] { "index.number_of_shards", "index.number_of_replicas" };
private static final String[] PRESERVED_SETTINGS = new String[] {
"index.number_of_shards",
"index.number_of_replicas",
"index.analysis.*",
"index.similarity.*",
"index.mapping.*" };

/**
* This is the minimum compatible version of the destination index we can currently work with.
Expand Down Expand Up @@ -206,24 +217,104 @@ private static CreateIndexRequest createIndexRequest(
}

private static Settings settings(GetSettingsResponse settingsResponse) {
Integer maxNumberOfShards = findMaxSettingValue(settingsResponse, IndexMetadata.SETTING_NUMBER_OF_SHARDS);
Integer maxNumberOfReplicas = findMaxSettingValue(settingsResponse, IndexMetadata.SETTING_NUMBER_OF_REPLICAS);
String[] settingsIndexKeys = {
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(),
MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.getKey(),
MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING.getKey(),
MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING.getKey(),
MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING.getKey(),
MapperService.INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING.getKey() };

Settings.Builder settingsBuilder = Settings.builder();
if (maxNumberOfShards != null) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, maxNumberOfShards);

for (String key : settingsIndexKeys) {
Long value = findMaxSettingValue(settingsResponse, key);
if (value != null) {
settingsBuilder.put(key, value);
}
}
if (maxNumberOfReplicas != null) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maxNumberOfReplicas);

Map<String, Tuple<String, Settings>> mergedSettings = new HashMap<>();

mergeSimilaritySettings(settingsResponse, mergedSettings);
mergeAnalysisSettings(settingsResponse, mergedSettings);

for (String settingsKey : Arrays.asList(
IndexModule.SIMILARITY_SETTINGS_PREFIX,
AnalysisRegistry.INDEX_ANALYSIS_FILTER,
AnalysisRegistry.INDEX_ANALYSIS_ANALYZER
)) {
for (Map.Entry<String, Tuple<String, Settings>> mergedSetting : mergedSettings.entrySet()) {
String index = mergedSetting.getValue().v1();
Set<String> settingsKeys = settingsResponse.getIndexToSettings().get(index).getAsSettings(settingsKey).keySet();
for (String key : settingsKeys) {
settingsBuilder = settingsBuilder.copy(settingsKey + "." + key, settingsResponse.getIndexToSettings().get(index));
}
}
}
return settingsBuilder.build();
}

private static void mergeSimilaritySettings(GetSettingsResponse settingsResponse, Map<String, Tuple<String, Settings>> mergedSettings) {
String settingsKey = IndexModule.SIMILARITY_SETTINGS_PREFIX;

for (Map.Entry<String, Settings> settingsEntry : settingsResponse.getIndexToSettings().entrySet()) {

Settings settings = settingsEntry.getValue().getAsSettings(settingsKey);
if (settings.isEmpty()) {
continue;
}

mergeSettings(settingsKey, settingsEntry.getKey(), settings, mergedSettings);
}
}

private static void mergeAnalysisSettings(GetSettingsResponse settingsResponse, Map<String, Tuple<String, Settings>> mergedSettings) {
for (String settingsKey : Arrays.asList(AnalysisRegistry.INDEX_ANALYSIS_FILTER, AnalysisRegistry.INDEX_ANALYSIS_ANALYZER)) {
for (Map.Entry<String, Settings> settingsEntry : settingsResponse.getIndexToSettings().entrySet()) {

Settings settings = settingsEntry.getValue().getAsSettings(settingsKey);
if (settings.isEmpty()) {
continue;
}

for (String name : settings.names()) {
Settings setting = settings.getAsSettings(name);
String fullName = settingsKey + "." + name;

mergeSettings(fullName, settingsEntry.getKey(), setting, mergedSettings);
}
}
}
}

private static void mergeSettings(String key, String index, Settings setting, Map<String, Tuple<String, Settings>> mergedSettings) {
if (mergedSettings.containsKey(key) == false) {
mergedSettings.put(key, new Tuple<>(index, setting));
} else {
Settings mergedSetting = mergedSettings.get(key).v2();
if (mergedSetting.equals(setting) == false) {
throw ExceptionsHelper.badRequestException(
"cannot merge settings because of differences for "
+ key
+ "; specified as [{}] in index [{}]; "
+ "specified as [{}] in index [{}]",
mergedSettings.get(key).v2(),
mergedSettings.get(key).v1(),
setting.toString(),
index
);
}
}
}

@Nullable
private static Integer findMaxSettingValue(GetSettingsResponse settingsResponse, String settingKey) {
Integer maxValue = null;
private static Long findMaxSettingValue(GetSettingsResponse settingsResponse, String settingKey) {
Long maxValue = null;
for (Settings settings : settingsResponse.getIndexToSettings().values()) {
Integer indexValue = settings.getAsInt(settingKey, null);
Long indexValue = settings.getAsLong(settingKey, null);
if (indexValue != null) {
maxValue = maxValue == null ? indexValue : Math.max(indexValue, maxValue);
}
Expand Down

0 comments on commit fc5c1f1

Please sign in to comment.