From 6ab76a526227f930e45b75bf686b2b551f1b15a9 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Sat, 27 Jul 2013 10:42:02 +0200 Subject: [PATCH] Added support for acknowledgement from other nodes in open/close index api The open/close index api now waits for an acknowledgement from all the other nodes before returning its response, till the timeout (configurable, default 10 secs) expires. The returned acknowledged flag reflects whether the cluster state change was acknowledged by all the nodes or the timeout expired before. Closes #3400 --- .../indices/close/CloseIndexRequest.java | 8 +- .../admin/indices/open/OpenIndexRequest.java | 8 +- .../cluster/ClusterChangedEvent.java | 22 +- .../elasticsearch/cluster/ClusterModule.java | 1 + .../index/NodeIndicesStateUpdatedAction.java | 154 ++++++++++++ .../metadata/MetaDataStateIndexService.java | 77 +++++- .../cluster/IndicesClusterStateService.java | 12 +- .../AbstractSharedClusterTest.java | 4 + .../indices/state/OpenCloseIndexTests.java | 220 ++++++++++++++++++ 9 files changed, 491 insertions(+), 15 deletions(-) create mode 100644 src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java create mode 100644 src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index bf645d6c39834..fc4ae4a19bab8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -44,7 +44,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -80,7 +80,7 @@ TimeValue timeout() { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults * to 10s. */ public CloseIndexRequest timeout(TimeValue timeout) { @@ -89,7 +89,7 @@ public CloseIndexRequest timeout(TimeValue timeout) { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults * to 10s. */ public CloseIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java index fbcece7273b59..e62ae7c499415 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java @@ -44,7 +44,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -80,7 +80,7 @@ TimeValue timeout() { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults * to 10s. */ public OpenIndexRequest timeout(TimeValue timeout) { @@ -89,7 +89,7 @@ public OpenIndexRequest timeout(TimeValue timeout) { } /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults + * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults * to 10s. */ public OpenIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index abf7a6af23f11..305b539e25b21 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,12 +20,14 @@ package org.elasticsearch.cluster; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import java.util.List; +import java.util.Map; /** * @@ -161,4 +163,22 @@ public boolean nodesAdded() { public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } -} + + public boolean indicesStateChanged() { + if (metaDataChanged()) { + ImmutableMap indices = state.metaData().indices(); + ImmutableMap previousIndices = previousState.metaData().indices(); + + for (Map.Entry entry : indices.entrySet()) { + IndexMetaData indexMetaData = entry.getValue(); + IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey()); + if (previousIndexMetaData != null + && indexMetaData.state() != previousIndexMetaData.state()) { + return true; + } + } + } + + return false; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 2adf89e519692..5df5395f96cc8 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -77,5 +77,6 @@ protected void configure() { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(NodeAliasesUpdatedAction.class).asEagerSingleton(); + bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java new file mode 100644 index 0000000000000..ad9394fae53fe --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java @@ -0,0 +1,154 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.cluster.action.index; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class NodeIndicesStateUpdatedAction extends AbstractComponent { + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final List listeners = new CopyOnWriteArrayList(); + + @Inject + public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterService = clusterService; + transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler()); + } + + public void add(final Listener listener, TimeValue timeout) { + listeners.add(listener); + threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { + @Override + public void run() { + boolean removed = listeners.remove(listener); + if (removed) { + listener.onTimeout(); + } + } + }); + } + + public void remove(Listener listener) { + listeners.remove(listener); + } + + public void nodeIndexStateUpdated(final NodeIndexStateUpdatedResponse response) throws ElasticSearchException { + DiscoveryNodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + innerNodeIndexStateUpdated(response); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); + } + } + + private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) { + for (Listener listener : listeners) { + listener.onIndexStateUpdated(response); + } + } + + private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeIndexStateUpdated"; + + @Override + public NodeIndexStateUpdatedResponse newInstance() { + return new NodeIndexStateUpdatedResponse(); + } + + @Override + public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception { + innerNodeIndexStateUpdated(response); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + public static interface Listener { + void onIndexStateUpdated(NodeIndexStateUpdatedResponse response); + void onTimeout(); + } + + public static class NodeIndexStateUpdatedResponse extends TransportRequest { + private String nodeId; + private long version; + + NodeIndexStateUpdatedResponse() { + } + + public NodeIndexStateUpdatedResponse(String nodeId, long version) { + this.nodeId = nodeId; + this.version = version; + } + + public String nodeId() { + return nodeId; + } + + public long version() { + return version; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeLong(version); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + version = in.readLong(); + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 9803132eafc4f..272f76b9b3e74 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -41,8 +42,13 @@ import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; import org.elasticsearch.rest.RestStatus; + import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + + /** * */ @@ -54,11 +60,14 @@ public class MetaDataStateIndexService extends AbstractComponent { private final AllocationService allocationService; + private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction; + @Inject - public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { + public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) { super(settings); this.clusterService = clusterService; this.allocationService = allocationService; + this.indicesStateUpdatedAction = indicesStateUpdatedAction; } public void closeIndex(final Request request, final Listener listener) { @@ -108,12 +117,19 @@ public ClusterState execute(ClusterState currentState) { RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } @@ -158,16 +174,32 @@ public ClusterState execute(ClusterState currentState) { RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; + } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } + private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) { + // wait for responses from other nodes if needed + int responseCount = updatedState.nodes().size(); + long version = updatedState.version() + 1; + logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version); + indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout); + } + public static interface Listener { void onResponse(Response response); @@ -208,4 +240,39 @@ public boolean acknowledged() { return acknowledged; } } + + private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener { + + private final AtomicBoolean notified = new AtomicBoolean(); + private final AtomicInteger countDown; + private final Listener listener; + private final long version; + + public CountDownListener(int countDown, Listener listener, long version) { + this.countDown = new AtomicInteger(countDown); + this.listener = listener; + this.version = version; + } + + @Override + public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) { + if (version <= response.version()) { + logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); + if (countDown.decrementAndGet() == 0) { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(true)); + } + } + } + } + + @Override + public void onTimeout() { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(false)); + } + } + } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11dd3f0a33f63..82654cdf389f7 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -86,6 +86,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent clients() { + return cluster().clients(); + } + public ImmutableSettings.Builder randomSettingsBuilder() { // TODO RANDOMIZE return ImmutableSettings.builder(); diff --git a/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java new file mode 100644 index 0000000000000..7bbba96e16700 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java @@ -0,0 +1,220 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.state; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.IgnoreIndices; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.test.integration.AbstractSharedClusterTest; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class OpenCloseIndexTests extends AbstractSharedClusterTest { + + @Test + public void testSimpleCloseOpen() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test(expected = IndexMissingException.class) + public void testSimpleCloseMissingIndex() { + Client client = client(); + client.admin().indices().prepareClose("test1").execute().actionGet(); + } + + @Test(expected = IndexMissingException.class) + public void testSimpleOpenMissingIndex() { + Client client = client(); + client.admin().indices().prepareOpen("test1").execute().actionGet(); + } + + @Test + public void testCloseOpenMultipleIndices() { + Client client = client(); + createIndex("test1", "test2", "test3"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse1 = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse1.isAcknowledged(), equalTo(true)); + CloseIndexResponse closeIndexResponse2 = client.admin().indices().prepareClose("test2").execute().actionGet(); + assertThat(closeIndexResponse2.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1", "test2"); + assertIndexIsOpened("test3"); + + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + OpenIndexResponse openIndexResponse2 = client.admin().indices().prepareOpen("test2").execute().actionGet(); + assertThat(openIndexResponse2.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1", "test2", "test3"); + } + + @Test(expected = ActionRequestValidationException.class) + public void testCloseNullIndex() { + Client client = client(); + client.admin().indices().prepareClose(null).execute().actionGet(); + } + + @Test(expected = ActionRequestValidationException.class) + public void testOpenNullIndex() { + Client client = client(); + client.admin().indices().prepareOpen(null).execute().actionGet(); + } + + @Test + public void testOpenAlreadyOpenedIndex() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + //no problem if we try to open an index that's already in open state + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test + public void testCloseAlreadyClosedIndex() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + //closing the index + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + //no problem if we try to close an index that's already in close state + OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse1.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test + public void testSimpleCloseOpenAlias() { + Client client = client(); + createIndex("test1"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + IndicesAliasesResponse aliasesResponse = client.admin().indices().prepareAliases().addAlias("test1", "test1-alias").execute().actionGet(); + assertThat(aliasesResponse.isAcknowledged(), equalTo(true)); + + CloseIndexResponse closeIndexResponse = client.admin().indices().prepareClose("test1-alias").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosed("test1"); + + OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1-alias").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpened("test1"); + } + + @Test (expected = ElasticSearchIllegalArgumentException.class) + public void testCloseOpenAliasMultipleIndices() { + Client client = client(); + createIndex("test1", "test2"); + ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + IndicesAliasesResponse aliasesResponse1 = client.admin().indices().prepareAliases().addAlias("test1", "test-alias").execute().actionGet(); + assertThat(aliasesResponse1.isAcknowledged(), equalTo(true)); + IndicesAliasesResponse aliasesResponse2 = client.admin().indices().prepareAliases().addAlias("test2", "test-alias").execute().actionGet(); + assertThat(aliasesResponse2.isAcknowledged(), equalTo(true)); + + client.admin().indices().prepareClose("test-alias").execute().actionGet(); + //Alias [test-alias] has more than one indices associated with it [[test1, test2]], can't execute a single index op + } + + @Test + public void testSimpleCloseOpenAcknowledged() { + createIndex("test1"); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosedOnAllNodes("test1"); + + OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpenedOnAllNodes("test1"); + } + + private void assertIndexIsOpened(String... indices) { + checkIndexState(IndexMetaData.State.OPEN, indices); + } + + private void assertIndexIsClosed(String... indices) { + checkIndexState(IndexMetaData.State.CLOSE, indices); + } + + private void assertIndexIsOpenedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices); + } + + private void assertIndexIsClosedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices); + } + + private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) { + //we explicitly check the cluster state on all nodes forcing the local execution + // we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); + checkIndexState(state, clusterStateResponse, indices); + } + } + + private void checkIndexState(IndexMetaData.State expectedState, String... indices) { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); + checkIndexState(expectedState, clusterStateResponse, indices); + } + + private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) { + for (String index : indices) { + IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index); + assertThat(indexMetaData, notNullValue()); + assertThat(indexMetaData.getState(), equalTo(expectedState)); + } + } +} \ No newline at end of file