diff --git a/docs/changelog/119780.yaml b/docs/changelog/119780.yaml new file mode 100644 index 0000000000000..5b7226741a416 --- /dev/null +++ b/docs/changelog/119780.yaml @@ -0,0 +1,5 @@ +pr: 119780 +summary: Add index and reindex request settings to speed up reindex +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index 0ca58ecf0f0d5..0902f6ce6468a 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -155,7 +157,11 @@ public void testSettingsAddedBeforeReindex() throws Exception { // update with a dynamic setting var numReplicas = randomIntBetween(0, 10); - var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build(); + var refreshInterval = randomIntBetween(1, 100) + "s"; + var dynamicSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), refreshInterval) + .build(); indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet(); // call reindex @@ -167,6 +173,7 @@ public void testSettingsAddedBeforeReindex() throws Exception { var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey())); } public void testMappingsAddedToDestIndex() throws Exception { @@ -229,6 +236,38 @@ public void testReadOnlyAddedBack() { removeReadOnly(destIndex); } + public void testUpdateSettingsDefaultsRestored() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // ESIntegTestCase creates a template random_index_template which contains a value for number_of_replicas. + // Since this test checks the behavior of default settings, there cannot be a value for number_of_replicas, + // so we delete the template within this method. This has no effect on other tests which will still + // have the template created during their setup. + assertAcked( + indicesAdmin().execute(TransportDeleteIndexTemplateAction.TYPE, new DeleteIndexTemplateRequest("random_index_template")) + ); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked(indicesAdmin().create(new CreateIndexRequest(sourceIndex))); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex)).actionGet(); + var destSettings = settingsResponse.getIndexToSettings().get(destIndex); + + assertEquals( + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getDefault(destSettings), + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(destSettings) + ); + assertEquals( + IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getDefault(destSettings), + IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.get(destSettings) + ); + } + public void testSettingsAndMappingsFromTemplate() throws IOException { assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 6dd38d35ca9f7..d86885ce0fbe4 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -25,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; @@ -95,6 +97,7 @@ protected void doExecute( .andThen(l -> deleteDestIfExists(destIndexName, l, taskId)) .andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId)) .andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId)) + .andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId)) .andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId)) .andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId)) .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) @@ -147,6 +150,8 @@ private void createIndex( var removeReadOnlyOverride = Settings.builder() .putNull(IndexMetadata.SETTING_READ_ONLY) .putNull(IndexMetadata.SETTING_BLOCKS_WRITE) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) .build(); var request = new CreateIndexFromSourceAction.Request( @@ -168,6 +173,7 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene reindexRequest.getSearchRequest().source().fetchSource(true); reindexRequest.setDestIndex(destIndexName); reindexRequest.setParentTask(parentTaskId); + reindexRequest.setSlices(0); // equivalent to slices=auto in rest api client.execute(ReindexAction.INSTANCE, reindexRequest, listener); } @@ -186,6 +192,35 @@ private void addBlockIfFromSource( } } + private void copyOldSourceSettingsToDest( + Settings settingsBefore, + String destIndexName, + ActionListener listener, + TaskId parentTaskId + ) { + logger.debug("Updating settings on destination index after reindex completes"); + + var settings = Settings.builder(); + copySettingOrUnset(settingsBefore, settings, IndexMetadata.SETTING_NUMBER_OF_REPLICAS); + copySettingOrUnset(settingsBefore, settings, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()); + + var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), destIndexName); + updateSettingsRequest.setParentTask(parentTaskId); + var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private static void copySettingOrUnset(Settings settingsBefore, Settings.Builder builder, String setting) { + // if setting was explicitly added to the source index + if (settingsBefore.get(setting) != null) { + // copy it back to the dest index + builder.copy(setting, settingsBefore); + } else { + // otherwise, delete from dest index so that it loads from the settings default + builder.putNull(setting); + } + } + public static String generateDestIndexName(String sourceIndex) { return "migrated-" + sourceIndex; }