Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;

public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {

Expand Down Expand Up @@ -153,4 +155,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
return List.of();
}
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> pluginSettings = new ArrayList<>();
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
return pluginSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.migrate.task;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -39,6 +42,19 @@
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
/*
* This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five
* data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic,
* but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node).
*/
public static final Setting<Integer> MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting(
"migrate.max_concurrent_indices_reindexed_per_data_stream",
1,
1,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -165,8 +181,9 @@ private void reindexIndices(
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
final int maxConcurrentIndices = 1;
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
}
Expand Down