diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index 8ad79f1676eb1..bb0f98ac07b7e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -25,7 +25,13 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - public CloseIndexClusterStateUpdateRequest() { + private final long taskId; + public CloseIndexClusterStateUpdateRequest(final long taskId) { + this.taskId = taskId; + } + + public long taskId() { + return taskId; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 605f0ed9217ac..bb3db084b0c53 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -99,13 +99,19 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta @Override protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state, + final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { listener.onResponse(new AcknowledgedResponse(true)); return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 1d283cbe004d0..f603f92a7189e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -141,8 +142,9 @@ public static class ShardRequest extends ReplicationRequest { ShardRequest(){ } - public ShardRequest(final ShardId shardId) { + public ShardRequest(final ShardId shardId, final TaskId parentTaskId) { super(shardId); + setParentTask(parentTaskId); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index cda8f9c6f0ac6..6ceda4bf57d13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -46,7 +46,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -63,6 +62,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -120,9 +120,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina throw new IllegalArgumentException("Index name is required"); } - final TimeValue timeout = request.ackTimeout(); - final TimeValue masterTimeout = request.masterNodeTimeout(); - clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { @@ -141,7 +138,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, + .execute(new WaitForClosedBlocksApplied(blockedIndices, request, ActionListener.wrap(closedBlocksResults -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -176,7 +173,7 @@ public void onFailure(final String source, final Exception e) { @Override public TimeValue timeout() { - return masterTimeout; + return request.masterNodeTimeout(); } } ); @@ -246,18 +243,18 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta class WaitForClosedBlocksApplied extends AbstractRunnable { private final Set blockedIndices; - private final @Nullable TimeValue timeout; + private final CloseIndexClusterStateUpdateRequest request; private final ActionListener> listener; private WaitForClosedBlocksApplied(final Set blockedIndices, - final @Nullable TimeValue timeout, + final CloseIndexClusterStateUpdateRequest request, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); } this.blockedIndices = blockedIndices; + this.request = request; this.listener = listener; - this.timeout = timeout; } @Override @@ -271,7 +268,7 @@ protected void doRun() throws Exception { final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> { + waitForShardsReadyForClosing(blockedIndex, state, response -> { results.put(blockedIndex, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); @@ -280,7 +277,7 @@ protected void doRun() throws Exception { } } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout, + private void waitForShardsReadyForClosing(final Index index, final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { @@ -302,7 +299,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -326,7 +323,7 @@ private void processIfFinished() { } } - private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout, + private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -336,10 +333,11 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar listener.onResponse(response); return; } + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); - if (timeout != null) { - shardRequest.timeout(timeout); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId); + if (request.ackTimeout() != null) { + shardRequest.timeout(request.ackTimeout()); } // TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests transportVerifyShardBeforeCloseAction.execute(shardRequest, listener); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 2764eee798e6b..c0da96ed1efb7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -130,7 +131,7 @@ public static void afterClass() { private void executeOnPrimaryOrReplica() throws Exception { final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId()); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong())); if (randomBoolean()) { assertNotNull(action.shardOperationOnPrimary(request, indexShard)); } else { @@ -204,7 +205,8 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); final PlainActionFuture listener = new PlainActionFuture<>(); - TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + TransportVerifyShardBeforeCloseAction.ShardRequest request = + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L)); ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); ReplicationOperation operation = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index 36cce46d47c46..3031ec5b2a409 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -109,13 +109,19 @@ private Index[] resolveIndices(FreezeRequest request, ClusterState state) { @Override protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state, + ActionListener listener) throws Exception { final Index[] concreteIndices = resolveIndices(request, state); if (concreteIndices.length == 0) { listener.onResponse(new FreezeResponse(true, true)); return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices);