diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index bf645d6c39834..fc4ae4a19bab8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -44,7 +44,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -80,7 +80,7 @@ TimeValue timeout() { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults * to 10s. */ public CloseIndexRequest timeout(TimeValue timeout) { @@ -89,7 +89,7 @@ public CloseIndexRequest timeout(TimeValue timeout) { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults * to 10s. */ public CloseIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java index fbcece7273b59..e62ae7c499415 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java @@ -44,7 +44,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -80,7 +80,7 @@ TimeValue timeout() { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults * to 10s. */ public OpenIndexRequest timeout(TimeValue timeout) { @@ -89,7 +89,7 @@ public OpenIndexRequest timeout(TimeValue timeout) { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults * to 10s. */ public OpenIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index abf7a6af23f11..305b539e25b21 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,12 +20,14 @@ package org.elasticsearch.cluster; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import java.util.List; +import java.util.Map; /** * @@ -161,4 +163,22 @@ public boolean nodesAdded() { public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } -} + + public boolean indicesStateChanged() { + if (metaDataChanged()) { + ImmutableMap indices = state.metaData().indices(); + ImmutableMap previousIndices = previousState.metaData().indices(); + + for (Map.Entry entry : indices.entrySet()) { + IndexMetaData indexMetaData = entry.getValue(); + IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey()); + if (previousIndexMetaData != null + && indexMetaData.state() != previousIndexMetaData.state()) { + return true; + } + } + } + + return false; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 2adf89e519692..5df5395f96cc8 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -77,5 +77,6 @@ protected void configure() { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(NodeAliasesUpdatedAction.class).asEagerSingleton(); + bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java new file mode 100644 index 0000000000000..ad9394fae53fe --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java @@ -0,0 +1,154 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.cluster.action.index; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class NodeIndicesStateUpdatedAction extends AbstractComponent { + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final List listeners = new CopyOnWriteArrayList(); + + @Inject + public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterService = clusterService; + transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler()); + } + + public void add(final Listener listener, TimeValue timeout) { + listeners.add(listener); + threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { + @Override + public void run() { + boolean removed = listeners.remove(listener); + if (removed) { + listener.onTimeout(); + } + } + }); + } + + public void remove(Listener listener) { + listeners.remove(listener); + } + + public void nodeIndexStateUpdated(final NodeIndexStateUpdatedResponse response) throws ElasticSearchException { + DiscoveryNodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + innerNodeIndexStateUpdated(response); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); + } + } + + private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) { + for (Listener listener : listeners) { + listener.onIndexStateUpdated(response); + } + } + + private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeIndexStateUpdated"; + + @Override + public NodeIndexStateUpdatedResponse newInstance() { + return new NodeIndexStateUpdatedResponse(); + } + + @Override + public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception { + innerNodeIndexStateUpdated(response); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + public static interface Listener { + void onIndexStateUpdated(NodeIndexStateUpdatedResponse response); + void onTimeout(); + } + + public static class NodeIndexStateUpdatedResponse extends TransportRequest { + private String nodeId; + private long version; + + NodeIndexStateUpdatedResponse() { + } + + public NodeIndexStateUpdatedResponse(String nodeId, long version) { + this.nodeId = nodeId; + this.version = version; + } + + public String nodeId() { + return nodeId; + } + + public long version() { + return version; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeLong(version); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + version = in.readLong(); + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 9803132eafc4f..272f76b9b3e74 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -41,8 +42,13 @@ import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; import org.elasticsearch.rest.RestStatus; + import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + + /** * */ @@ -54,11 +60,14 @@ public class MetaDataStateIndexService extends AbstractComponent { private final AllocationService allocationService; + private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction; + @Inject - public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { + public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) { super(settings); this.clusterService = clusterService; this.allocationService = allocationService; + this.indicesStateUpdatedAction = indicesStateUpdatedAction; } public void closeIndex(final Request request, final Listener listener) { @@ -108,12 +117,19 @@ public ClusterState execute(ClusterState currentState) { RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } @@ -158,16 +174,32 @@ public ClusterState execute(ClusterState currentState) { RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; + } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } + private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) { + // wait for responses from other nodes if needed + int responseCount = updatedState.nodes().size(); + long version = updatedState.version() + 1; + logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version); + indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout); + } + public static interface Listener { void onResponse(Response response); @@ -208,4 +240,39 @@ public boolean acknowledged() { return acknowledged; } } + + private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener { + + private final AtomicBoolean notified = new AtomicBoolean(); + private final AtomicInteger countDown; + private final Listener listener; + private final long version; + + public CountDownListener(int countDown, Listener listener, long version) { + this.countDown = new AtomicInteger(countDown); + this.listener = listener; + this.version = version; + } + + @Override + public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) { + if (version <= response.version()) { + logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); + if (countDown.decrementAndGet() == 0) { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(true)); + } + } + } + } + + @Override + public void onTimeout() { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(false)); + } + } + } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11dd3f0a33f63..82654cdf389f7 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -86,6 +86,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent clients() { + return cluster().clients(); + } + public ImmutableSettings.Builder randomSettingsBuilder() { // TODO RANDOMIZE return ImmutableSettings.builder(); diff --git a/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java new file mode 100644 index 0000000000000..7bbba96e16700 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java @@ -0,0 +1,220 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.state; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.IgnoreIndices; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.test.integration.AbstractSharedClusterTest; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class OpenCloseIndexTests extends AbstractSharedClusterTest { + + @Test + public void testSimpleCloseOpen() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test(expected = IndexMissingException.class) + public void testSimpleCloseMissingIndex() { + Client client = client(); + client.admin().indices().prepareClose("test1").execute().actionGet(); + } + + @Test(expected = IndexMissingException.class) + public void testSimpleOpenMissingIndex() { + Client client = client(); + client.admin().indices().prepareOpen("test1").execute().actionGet(); + } + + @Test + public void testCloseOpenMultipleIndices() { + Client client = client(); + createIndex("test1", "test2", "test3"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse1 = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse1.isAcknowledged(), equalTo(true)); + CloseIndexResponse closeIndexResponse2 = client.admin().indices().prepareClose("test2").execute().actionGet(); + assertThat(closeIndexResponse2.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1", "test2"); + assertIndexIsOpened("test3"); + + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + OpenIndexResponse openIndexResponse2 = client.admin().indices().prepareOpen("test2").execute().actionGet(); + assertThat(openIndexResponse2.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1", "test2", "test3"); + } + + @Test(expected = ActionRequestValidationException.class) + public void testCloseNullIndex() { + Client client = client(); + client.admin().indices().prepareClose(null).execute().actionGet(); + } + + @Test(expected = ActionRequestValidationException.class) + public void testOpenNullIndex() { + Client client = client(); + client.admin().indices().prepareOpen(null).execute().actionGet(); + } + + @Test + public void testOpenAlreadyOpenedIndex() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + //no problem if we try to open an index that's already in open state + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test + public void testCloseAlreadyClosedIndex() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + //closing the index + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + //no problem if we try to close an index that's already in close state + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test + public void testSimpleCloseOpenAlias() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + IndicesAliasesResponse aliasesResponse = client.admin().indices().prepareAliases().addAlias("test1", "test1-alias").execute().actionGet(); + assertThat(aliasesResponse.isAcknowledged(), equalTo(true)); + + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1-alias").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1-alias").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test (expected = ElasticSearchIllegalArgumentException.class) + public void testCloseOpenAliasMultipleIndices() { + Client client = client(); + createIndex("test1", "test2"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + IndicesAliasesResponse aliasesResponse1 = client.admin().indices().prepareAliases().addAlias("test1", "test-alias").execute().actionGet(); + assertThat(aliasesResponse1.isAcknowledged(), equalTo(true)); + IndicesAliasesResponse aliasesResponse2 = client.admin().indices().prepareAliases().addAlias("test2", "test-alias").execute().actionGet(); + assertThat(aliasesResponse2.isAcknowledged(), equalTo(true)); + + client.admin().indices().prepareClose("test-alias").execute().actionGet(); + //Alias [test-alias] has more than one indices associated with it [[test1, test2]], can't execute a single index op + } + + @Test + public void testSimpleCloseOpenAcknowledged() { + createIndex("test1"); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosedOnAllNodes("test1"); + + OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpenedOnAllNodes("test1"); + } + + private void assertIndexIsOpened(String... indices) { + checkIndexState(IndexMetaData.State.OPEN, indices); + } + + private void assertIndexIsClosed(String... indices) { + checkIndexState(IndexMetaData.State.CLOSE, indices); + } + + private void assertIndexIsOpenedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices); + } + + private void assertIndexIsClosedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices); + } + + private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) { + //we explicitly check the cluster state on all nodes forcing the local execution + // we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); + checkIndexState(state, clusterStateResponse, indices); + } + } + + private void checkIndexState(IndexMetaData.State expectedState, String... indices) { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); + checkIndexState(expectedState, clusterStateResponse, indices); + } + + private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) { + for (String index : indices) { + IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index); + assertThat(indexMetaData, notNullValue()); + assertThat(indexMetaData.getState(), equalTo(expectedState)); + } + } +} \ No newline at end of file