From 19593884eef94392a48d5ba05c0255edb4ea0715 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Jan 2019 09:43:50 +0100 Subject: [PATCH] [Close Index API] Propagate tasks ids between Freeze, Close and Verify Shard actions (#36630) This pull request changes the Freeze Index and Close Index actions so that these actions always requires a Task. The task's id is then propagated from the Freeze action to the Close action, and then to the Verify shard action. This way it is possible to track which Freeze task initiates the closing of an index, and which consecutive verifiy shard are executed for the index closing. --- .../CloseIndexClusterStateUpdateRequest.java | 8 ++++- .../close/TransportCloseIndexAction.java | 8 ++++- ...TransportVerifyShardBeforeCloseAction.java | 4 ++- .../metadata/MetaDataIndexStateService.java | 30 +++++++++---------- ...portVerifyShardBeforeCloseActionTests.java | 6 ++-- .../action/TransportFreezeIndexAction.java | 8 ++++- 6 files changed, 42 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index 8ad79f1676eb1..bb0f98ac07b7e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -25,7 +25,13 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - public CloseIndexClusterStateUpdateRequest() { + private final long taskId; + public CloseIndexClusterStateUpdateRequest(final long taskId) { + this.taskId = taskId; + } + + public long taskId() { + return taskId; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 605f0ed9217ac..bb3db084b0c53 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -99,13 +99,19 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta @Override protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state, + final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { listener.onResponse(new AcknowledgedResponse(true)); return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 1d283cbe004d0..f603f92a7189e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -141,8 +142,9 @@ public static class ShardRequest extends ReplicationRequest { ShardRequest(){ } - public ShardRequest(final ShardId shardId) { + public ShardRequest(final ShardId shardId, final TaskId parentTaskId) { super(shardId); + setParentTask(parentTaskId); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index cda8f9c6f0ac6..6ceda4bf57d13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -46,7 +46,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -63,6 +62,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -120,9 +120,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina throw new IllegalArgumentException("Index name is required"); } - final TimeValue timeout = request.ackTimeout(); - final TimeValue masterTimeout = request.masterNodeTimeout(); - clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { @@ -141,7 +138,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, + .execute(new WaitForClosedBlocksApplied(blockedIndices, request, ActionListener.wrap(closedBlocksResults -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -176,7 +173,7 @@ public void onFailure(final String source, final Exception e) { @Override public TimeValue timeout() { - return masterTimeout; + return request.masterNodeTimeout(); } } ); @@ -246,18 +243,18 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta class WaitForClosedBlocksApplied extends AbstractRunnable { private final Set blockedIndices; - private final @Nullable TimeValue timeout; + private final CloseIndexClusterStateUpdateRequest request; private final ActionListener> listener; private WaitForClosedBlocksApplied(final Set blockedIndices, - final @Nullable TimeValue timeout, + final CloseIndexClusterStateUpdateRequest request, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); } this.blockedIndices = blockedIndices; + this.request = request; this.listener = listener; - this.timeout = timeout; } @Override @@ -271,7 +268,7 @@ protected void doRun() throws Exception { final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> { + waitForShardsReadyForClosing(blockedIndex, state, response -> { results.put(blockedIndex, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); @@ -280,7 +277,7 @@ protected void doRun() throws Exception { } } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout, + private void waitForShardsReadyForClosing(final Index index, final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { @@ -302,7 +299,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -326,7 +323,7 @@ private void processIfFinished() { } } - private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout, + private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -336,10 +333,11 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar listener.onResponse(response); return; } + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); - if (timeout != null) { - shardRequest.timeout(timeout); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId); + if (request.ackTimeout() != null) { + shardRequest.timeout(request.ackTimeout()); } // TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests transportVerifyShardBeforeCloseAction.execute(shardRequest, listener); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 2764eee798e6b..c0da96ed1efb7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -130,7 +131,7 @@ public static void afterClass() { private void executeOnPrimaryOrReplica() throws Exception { final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId()); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong())); if (randomBoolean()) { assertNotNull(action.shardOperationOnPrimary(request, indexShard)); } else { @@ -204,7 +205,8 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); final PlainActionFuture listener = new PlainActionFuture<>(); - TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + TransportVerifyShardBeforeCloseAction.ShardRequest request = + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L)); ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); ReplicationOperation operation = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index 36cce46d47c46..3031ec5b2a409 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -109,13 +109,19 @@ private Index[] resolveIndices(FreezeRequest request, ClusterState state) { @Override protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state, + ActionListener listener) throws Exception { final Index[] concreteIndices = resolveIndices(request, state); if (concreteIndices.length == 0) { listener.onResponse(new FreezeResponse(true, true)); return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices);