Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -39,10 +43,24 @@ public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.
public static final ParseField SOURCE_FIELD = new ParseField("source");
public static final ParseField INDEX_FIELD = new ParseField("index");

/*
* The version before which we do not support writes in the _next_ major version of Elasticsearch. For example, Elasticsearch 10.x will
* not support writing to indices created before version 9.0.0.
*/
private static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.V_8_0_0;

public ReindexDataStreamAction() {
super(NAME);
}

/*
* This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
* in order to be writable in the _next_ lucene version.
*/
public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
}

public enum Mode {
UPGRADE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

/*
* This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
Expand Down Expand Up @@ -67,10 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
return;
}
int totalIndices = dataStream.getIndices().size();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
.stream()
.filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion())
.count();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count();
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
sourceDataStreamName,
transportService.getThreadPool().absoluteTimeInMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
Expand Down Expand Up @@ -72,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.get(0).getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
Expand Down