diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java new file mode 100644 index 0000000000000..fdc96892d4b27 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class ReindexDataStreamTransportActionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class); + } + + public void testNonExistentDataStream() { + String nonExistentDataStreamName = randomAlphaOfLength(50); + ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(nonExistentDataStreamName); + assertThrows( + ResourceNotFoundException.class, + () -> client().execute(new ActionType(ReindexDataStreamAction.NAME), reindexDataStreamRequest) + .actionGet() + ); + } + + public void testAlreadyUpToDateDataStream() throws Exception { + String dataStreamName = randomAlphaOfLength(50).toLowerCase(Locale.ROOT); + ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(dataStreamName); + createDataStream(dataStreamName); + ReindexDataStreamResponse response = client().execute( + new ActionType(ReindexDataStreamAction.NAME), + reindexDataStreamRequest + ).actionGet(); + String persistentTaskId = response.getTaskId(); + assertThat(persistentTaskId, equalTo("reindex-data-stream-" + dataStreamName)); + AtomicReference runningTask = new AtomicReference<>(); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + TaskManager taskManager = transportService.getTaskManager(); + Map tasksMap = taskManager.getCancellableTasks(); + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof ReindexDataStreamTask + && persistentTaskId.equals((((ReindexDataStreamTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + optionalTask.ifPresent( + longCancellableTaskEntry -> runningTask.compareAndSet(null, (ReindexDataStreamTask) longCancellableTaskEntry.getValue()) + ); + } + ReindexDataStreamTask task = runningTask.get(); + assertNotNull(task); + assertThat(task.getStatus().complete(), equalTo(true)); + assertNull(task.getStatus().exception()); + assertThat(task.getStatus().pending(), equalTo(0)); + assertThat(task.getStatus().inProgress(), equalTo(0)); + assertThat(task.getStatus().errors().size(), equalTo(0)); + } + + private void createDataStream(String dataStreamName) { + final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest = + new TransportPutComposableIndexTemplateAction.Request("my-template"); + putComposableTemplateRequest.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .template(Template.builder().build()) + .build() + ); + final AcknowledgedResponse putComposableTemplateResponse = safeGet( + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest) + ); + assertThat(putComposableTemplateResponse.isAcknowledged(), is(true)); + + final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStreamName + ); + final AcknowledgedResponse createDataStreamResponse = safeGet( + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) + ); + assertThat(createDataStreamResponse.isAcknowledged(), is(true)); + indexDocs(dataStreamName); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + indexDocs(dataStreamName); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + } + + private void indexDocs(String dataStreamName) { + int docs = randomIntBetween(5, 10); + CountDownLatch countDownLatch = new CountDownLatch(docs); + for (int i = 0; i < docs; i++) { + var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + final String doc = "{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"something cool happened\" }"; + indexRequest.source(doc, XContentType.JSON); + client().index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(DocWriteResponse docWriteResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Indexing request should have succeeded eventually, failed with " + e.getMessage()); + } + }); + } + safeAwait(countDownLatch); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index cb7445705537a..2f3b63d27ca35 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -19,19 +19,23 @@ import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction; @@ -40,6 +44,7 @@ import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction; import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; +import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -73,14 +78,27 @@ import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; +import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor; +import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState; +import org.elasticsearch.datastreams.task.ReindexDataStreamStatus; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.HealthPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; import java.io.IOException; import java.time.Clock; @@ -93,7 +111,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; -public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin { public static final Setting TIME_SERIES_POLL_INTERVAL = Setting.timeSetting( "time_series.poll_interval", @@ -244,6 +262,7 @@ public Collection createComponents(PluginServices services) { actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class)); actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); } + actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); return actions; } @@ -302,4 +321,48 @@ public void close() throws IOException { public Collection getHealthIndicatorServices() { return List.of(dataStreamLifecycleHealthIndicatorService.get()); } + + @Override + public List getNamedXContent() { + return List.of( + new NamedXContentRegistry.Entry( + PersistentTaskState.class, + new ParseField(ReindexDataStreamPersistentTaskState.NAME), + ReindexDataStreamPersistentTaskState::fromXContent + ), + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(ReindexDataStreamTaskParams.NAME), + ReindexDataStreamTaskParams::fromXContent + ) + ); + } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + PersistentTaskState.class, + ReindexDataStreamPersistentTaskState.NAME, + ReindexDataStreamPersistentTaskState::new + ), + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + ReindexDataStreamTaskParams.NAME, + ReindexDataStreamTaskParams::new + ), + new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new) + ); + } + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); + } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java new file mode 100644 index 0000000000000..0a86985c6c7b2 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/* + * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation + * of the persistent task, it responds with the persistent task id so that the user can monitor the persistent task. + */ +public class ReindexDataStreamTransportAction extends HandledTransportAction { + private final PersistentTasksService persistentTasksService; + private final TransportService transportService; + private final ClusterService clusterService; + + @Inject + public ReindexDataStreamTransportAction( + TransportService transportService, + ActionFilters actionFilters, + PersistentTasksService persistentTasksService, + ClusterService clusterService + ) { + super( + ReindexDataStreamAction.NAME, + true, + transportService, + actionFilters, + ReindexDataStreamRequest::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.transportService = transportService; + this.persistentTasksService = persistentTasksService; + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Task task, ReindexDataStreamRequest request, ActionListener listener) { + String sourceDataStreamName = request.getSourceDataStream(); + Metadata metadata = clusterService.state().metadata(); + DataStream dataStream = metadata.dataStreams().get(sourceDataStreamName); + if (dataStream == null) { + listener.onFailure(new ResourceNotFoundException("Data stream named [{}] does not exist", sourceDataStreamName)); + return; + } + int totalIndices = dataStream.getIndices().size(); + int totalIndicesToBeUpgraded = (int) dataStream.getIndices() + .stream() + .filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion()) + .count(); + ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( + sourceDataStreamName, + transportService.getThreadPool().absoluteTimeInMillis(), + totalIndices, + totalIndicesToBeUpgraded + ); + String persistentTaskId = getPersistentTaskId(sourceDataStreamName); + persistentTasksService.sendStartRequest( + persistentTaskId, + ReindexDataStreamTask.TASK_NAME, + params, + null, + ActionListener.wrap(startedTask -> listener.onResponse(new ReindexDataStreamResponse(persistentTaskId)), listener::onFailure) + ); + } + + private String getPersistentTaskId(String dataStreamName) throws ResourceAlreadyExistsException { + return "reindex-data-stream-" + dataStreamName; + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java new file mode 100644 index 0000000000000..bcd1ae314ac37 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; + +public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { + private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); + private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; + + public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) { + super(taskName, threadPool.generic()); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + @Override + protected ReindexDataStreamTask createTask( + long id, + String type, + String action, + TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers + ) { + ReindexDataStreamTaskParams params = taskInProgress.getParams(); + return new ReindexDataStreamTask( + params.startTime(), + params.totalIndices(), + params.totalIndicesToBeUpgraded(), + threadPool, + id, + type, + action, + "id=" + taskInProgress.getId(), + parentTaskId, + headers + ); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) { + String sourceDataStream = params.getSourceDataStream(); + GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); + assert task instanceof ReindexDataStreamTask; + final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; + client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { + List dataStreamInfos = response.getDataStreams(); + if (dataStreamInfos.size() == 1) { + List indices = dataStreamInfos.get(0).getDataStream().getIndices(); + List indicesToBeReindexed = indices.stream() + .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) + .toList(); + reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + for (Index index : indicesToBeReindexed) { + // TODO This is just a placeholder. This is where the real data stream reindex logic will go + } + + completeSuccessfulPersistentTask(reindexDataStreamTask); + } else { + completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); + } + }, reindexDataStreamTask::markAsFailed)); + } + + private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { + persistentTask.reindexSucceeded(); + threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); + } + + private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { + persistentTask.reindexFailed(e); + threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); + } + + private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) { + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask( + reindexDataStreamTask.getPersistentTaskId() + ); + PersistentTaskState state = persistentTask.getState(); + final long completionTime; + if (state == null) { + completionTime = threadPool.absoluteTimeInMillis(); + reindexDataStreamTask.updatePersistentTaskState( + new ReindexDataStreamPersistentTaskState(completionTime), + ActionListener.noop() + ); + } else { + completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime(); + } + return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java new file mode 100644 index 0000000000000..d6f32a3d34a7a --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState { + public static final String NAME = ReindexDataStreamTask.TASK_NAME; + private static final String COMPLETION_TIME_FIELD = "completion_time"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + true, + args -> new ReindexDataStreamPersistentTaskState((long) args[0]) + ); + static { + PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD)); + } + + public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException { + this(in.readLong()); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(completionTime); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(COMPLETION_TIME_FIELD, completionTime); + builder.endObject(); + return builder; + } + + public static ReindexDataStreamPersistentTaskState fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java new file mode 100644 index 0000000000000..10dfded853a13 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public record ReindexDataStreamStatus( + long persistentTaskStartTime, + int totalIndices, + int totalIndicesToBeUpgraded, + boolean complete, + Exception exception, + int inProgress, + int pending, + List> errors +) implements Task.Status { + public ReindexDataStreamStatus { + Objects.requireNonNull(errors); + } + + public static final String NAME = "ReindexDataStreamStatus"; + + public ReindexDataStreamStatus(StreamInput in) throws IOException { + this( + in.readLong(), + in.readInt(), + in.readInt(), + in.readBoolean(), + in.readException(), + in.readInt(), + in.readInt(), + in.readCollectionAsList(in1 -> Tuple.tuple(in1.readString(), in1.readException())) + ); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(persistentTaskStartTime); + out.writeInt(totalIndices); + out.writeInt(totalIndicesToBeUpgraded); + out.writeBoolean(complete); + out.writeException(exception); + out.writeInt(inProgress); + out.writeInt(pending); + out.writeCollection(errors, (out1, tuple) -> { + out1.writeString(tuple.v1()); + out1.writeException(tuple.v2()); + }); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("start_time", persistentTaskStartTime); + builder.field("complete", complete); + builder.field("total_indices", totalIndices); + builder.field("total_indices_requiring_upgrade", totalIndicesToBeUpgraded); + builder.field("successes", totalIndicesToBeUpgraded - (inProgress + pending + errors.size())); + builder.field("in_progress", inProgress); + builder.field("pending", pending); + builder.startArray("errors"); + for (Tuple error : errors) { + builder.startObject(); + builder.field("index", error.v1()); + builder.field("message", error.v2().getMessage()); + builder.endObject(); + } + builder.endArray(); + if (exception != null) { + builder.field("exception", exception.getMessage()); + } + builder.endObject(); + return builder; + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java new file mode 100644 index 0000000000000..2ae244679659f --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.core.Tuple; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ReindexDataStreamTask extends AllocatedPersistentTask { + public static final String TASK_NAME = "reindex-data-stream"; + private final long persistentTaskStartTime; + private final int totalIndices; + private final int totalIndicesToBeUpgraded; + private final ThreadPool threadPool; + private boolean complete = false; + private Exception exception; + private List inProgress = new ArrayList<>(); + private List pending = List.of(); + private List> errors = new ArrayList<>(); + + public ReindexDataStreamTask( + long persistentTaskStartTime, + int totalIndices, + int totalIndicesToBeUpgraded, + ThreadPool threadPool, + long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers + ) { + super(id, type, action, description, parentTask, headers); + this.persistentTaskStartTime = persistentTaskStartTime; + this.totalIndices = totalIndices; + this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; + this.threadPool = threadPool; + } + + @Override + public ReindexDataStreamStatus getStatus() { + return new ReindexDataStreamStatus( + persistentTaskStartTime, + totalIndices, + totalIndicesToBeUpgraded, + complete, + exception, + inProgress.size(), + pending.size(), + errors + ); + } + + public void reindexSucceeded() { + this.complete = true; + } + + public void reindexFailed(Exception e) { + this.complete = true; + this.exception = e; + } + + public void setInProgressIndices(List inProgressIndices) { + this.inProgress = inProgressIndices; + } + + public void setPendingIndices(List pendingIndices) { + this.pending = pendingIndices; + } + + public void addErrorIndex(String index, Exception error) { + this.errors.add(Tuple.tuple(index, error)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java new file mode 100644 index 0000000000000..5efbc6b672216 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded) + implements + PersistentTaskParams { + + public static final String NAME = ReindexDataStreamTask.TASK_NAME; + private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream"; + private static final String START_TIME_FIELD = "start_time"; + private static final String TOTAL_INDICES_FIELD = "total_indices"; + private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + true, + args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3]) + ); + static { + PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD)); + PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD)); + } + + public ReindexDataStreamTaskParams(StreamInput in) throws IOException { + this(in.readString(), in.readLong(), in.readInt(), in.readInt()); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.REINDEX_DATA_STREAMS; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(sourceDataStream); + out.writeLong(startTime); + out.writeInt(totalIndices); + out.writeInt(totalIndicesToBeUpgraded); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field(SOURCE_DATA_STREAM_FIELD, sourceDataStream) + .field(START_TIME_FIELD, startTime) + .field(TOTAL_INDICES_FIELD, totalIndices) + .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded) + .endObject(); + } + + public String getSourceDataStream() { + return sourceDataStream; + } + + public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java new file mode 100644 index 0000000000000..7cd95bca7a12a --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +public class ReindexDataStreamPersistentTaskStateTests extends AbstractXContentSerializingTestCase { + @Override + protected ReindexDataStreamPersistentTaskState doParseInstance(XContentParser parser) throws IOException { + return ReindexDataStreamPersistentTaskState.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamPersistentTaskState::new; + } + + @Override + protected ReindexDataStreamPersistentTaskState createTestInstance() { + return new ReindexDataStreamPersistentTaskState(randomNonNegativeLong()); + } + + @Override + protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException { + return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java new file mode 100644 index 0000000000000..8f0fabc2ce7ee --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static java.util.Map.entry; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamStatusTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamStatus::new; + } + + @Override + protected ReindexDataStreamStatus createTestInstance() { + return new ReindexDataStreamStatus( + randomLong(), + randomNegativeInt(), + randomNegativeInt(), + randomBoolean(), + nullableTestException(), + randomNegativeInt(), + randomNegativeInt(), + randomErrorList() + ); + } + + private Exception nullableTestException() { + if (randomBoolean()) { + return testException(); + } + return null; + } + + private Exception testException() { + /* + * Unfortunately ElasticsearchException doesn't have an equals and just falls back to Object::equals. So we can't test for equality + * when we're using an exception. So always just use null. + */ + return null; + } + + private List randomList() { + return randomList(0); + } + + private List randomList(int minSize) { + return randomList(minSize, Math.max(minSize, 100), () -> randomAlphaOfLength(50)); + } + + private List> randomErrorList() { + return randomErrorList(0); + } + + private List> randomErrorList(int minSize) { + return randomList(minSize, Math.max(minSize, 100), () -> Tuple.tuple(randomAlphaOfLength(30), testException())); + } + + @Override + protected ReindexDataStreamStatus mutateInstance(ReindexDataStreamStatus instance) throws IOException { + long startTime = instance.persistentTaskStartTime(); + int totalIndices = instance.totalIndices(); + int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded(); + boolean complete = instance.complete(); + Exception exception = instance.exception(); + int inProgress = instance.inProgress(); + int pending = instance.pending(); + List> errors = instance.errors(); + switch (randomIntBetween(0, 6)) { + case 0 -> startTime = randomLong(); + case 1 -> totalIndices = totalIndices + 1; + case 2 -> totalIndicesToBeUpgraded = totalIndicesToBeUpgraded + 1; + case 3 -> complete = complete == false; + case 4 -> inProgress = inProgress + 1; + case 5 -> pending = pending + 1; + case 6 -> errors = randomErrorList(errors.size() + 1); + default -> throw new UnsupportedOperationException(); + } + return new ReindexDataStreamStatus( + startTime, + totalIndices, + totalIndicesToBeUpgraded, + complete, + exception, + inProgress, + pending, + errors + ); + } + + public void testToXContent() throws IOException { + ReindexDataStreamStatus status = new ReindexDataStreamStatus( + 1234L, + 200, + 100, + true, + new ElasticsearchException("the whole task failed"), + 12, + 8, + List.of( + Tuple.tuple("index7", new ElasticsearchException("index7 failed")), + Tuple.tuple("index8", new ElasticsearchException("index8 " + "failed")) + ) + ); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + status.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + Map parserMap = parser.map(); + assertThat( + parserMap, + equalTo( + Map.ofEntries( + entry("start_time", 1234), + entry("total_indices", 200), + entry("total_indices_requiring_upgrade", 100), + entry("complete", true), + entry("exception", "the whole task failed"), + entry("successes", 78), + entry("in_progress", 12), + entry("pending", 8), + entry( + "errors", + List.of( + Map.of("index", "index7", "message", "index7 failed"), + Map.of("index", "index8", "message", "index8 failed") + ) + ) + ) + ) + ); + } + } + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java new file mode 100644 index 0000000000000..55098bf4a68d5 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamTaskParams::new; + } + + @Override + protected ReindexDataStreamTaskParams createTestInstance() { + return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt()); + } + + @Override + protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams instance) { + String sourceDataStream = instance.sourceDataStream(); + long startTime = instance.startTime(); + int totalIndices = instance.totalIndices(); + int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded(); + switch (randomIntBetween(0, 3)) { + case 0 -> sourceDataStream = randomAlphaOfLength(50); + case 1 -> startTime = randomLong(); + case 2 -> totalIndices = totalIndices + 1; + case 3 -> totalIndices = totalIndicesToBeUpgraded + 1; + default -> throw new UnsupportedOperationException(); + } + return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded); + } + + @Override + protected ReindexDataStreamTaskParams doParseInstance(XContentParser parser) { + return ReindexDataStreamTaskParams.fromXContent(parser); + } + + public void testToXContent() throws IOException { + ReindexDataStreamTaskParams params = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + params.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + Map parserMap = parser.map(); + assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream())); + assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime())); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 893ed25c253f3..9cfcc0a381e17 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -205,6 +205,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0); public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0); public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0); + public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java new file mode 100644 index 0000000000000..814c512c43bec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class ReindexDataStreamAction extends ActionType { + + public static final ReindexDataStreamAction INSTANCE = new ReindexDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/reindex"; + + public ReindexDataStreamAction() { + super(NAME); + } + + public static class ReindexDataStreamResponse extends ActionResponse implements ToXContentObject { + private final String taskId; + + public ReindexDataStreamResponse(String taskId) { + super(); + this.taskId = taskId; + } + + public ReindexDataStreamResponse(StreamInput in) throws IOException { + super(in); + this.taskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(taskId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("task", getTaskId()); + builder.endObject(); + return builder; + } + + public String getTaskId() { + return taskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(taskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof ReindexDataStreamResponse && taskId.equals(((ReindexDataStreamResponse) other).taskId); + } + + } + + public static class ReindexDataStreamRequest extends ActionRequest { + private final String sourceDataStream; + + public ReindexDataStreamRequest(String sourceDataStream) { + super(); + this.sourceDataStream = sourceDataStream; + } + + public ReindexDataStreamRequest(StreamInput in) throws IOException { + super(in); + this.sourceDataStream = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceDataStream); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; // do not wait_for_completion + } + + public String getSourceDataStream() { + return sourceDataStream; + } + + @Override + public int hashCode() { + return Objects.hashCode(sourceDataStream); + } + + @Override + public boolean equals(Object other) { + return other instanceof ReindexDataStreamRequest + && sourceDataStream.equals(((ReindexDataStreamRequest) other).sourceDataStream); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java new file mode 100644 index 0000000000000..fe839c28aab88 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamResponse::new; + } + + @Override + protected ReindexDataStreamResponse createTestInstance() { + return new ReindexDataStreamResponse(randomAlphaOfLength(40)); + } + + @Override + protected ReindexDataStreamResponse mutateInstance(ReindexDataStreamResponse instance) { + return createTestInstance(); + } + + public void testToXContent() throws IOException { + ReindexDataStreamResponse response = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + response.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + assertThat(parser.map(), equalTo(Map.of("task", response.getTaskId()))); + } + } + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 82408f71213b3..bfff63442281d 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -491,6 +491,7 @@ public class Constants { "indices:admin/block/add[s]", "indices:admin/cache/clear", "indices:admin/data_stream/lazy_rollover", + "indices:admin/data_stream/reindex", "indices:internal/admin/ccr/restore/file_chunk/get", "indices:internal/admin/ccr/restore/session/clear", "indices:internal/admin/ccr/restore/session/put",