Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove closed indices from cluster state #9334

Merged
merged 1 commit into from Jan 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/tribe/TribeService.java
Expand Up @@ -255,7 +255,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
String markedTribeName = index.settings().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) {
if (tribeIndex == null || tribeIndex.state() == IndexMetaData.State.CLOSE) {
logger.info("[{}] removing index [{}]", tribeName, index.index());
removeIndex(blocks, metaData, routingTable, index);
} else {
Expand Down
104 changes: 81 additions & 23 deletions src/test/java/org/elasticsearch/tribe/TribeTests.java
Expand Up @@ -20,12 +20,15 @@
package org.elasticsearch.tribe;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -34,6 +37,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -44,6 +48,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -125,7 +130,7 @@ private void setupTribeNode(Settings settings) {
public void testGlobalReadWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));


setupTribeNode(ImmutableSettings.builder()
Expand Down Expand Up @@ -162,10 +167,10 @@ public void testGlobalReadWriteBlocks() throws Exception {
@Test
public void testIndexWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
internalCluster().client().admin().indices().prepareCreate("block_test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
cluster2.client().admin().indices().prepareCreate("block_test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2"));

setupTribeNode(ImmutableSettings.builder()
.put("tribe.blocks.write.indices", "block_*")
Expand Down Expand Up @@ -226,10 +231,10 @@ private void testOnConflictPrefer(String tribe) throws Exception {
logger.info("testing preference for tribe {}", tribe);

logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("conflict").get();
cluster2.client().admin().indices().prepareCreate("conflict").get();
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict"));
assertAcked(cluster2.client().admin().indices().prepareCreate("conflict"));
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));

setupTribeNode(ImmutableSettings.builder()
.put("tribe.on_conflict", "prefer_" + tribe)
Expand All @@ -249,8 +254,8 @@ private void testOnConflictPrefer(String tribe) throws Exception {
public void testTribeOnOneCluster() throws Exception {
setupTribeNode(ImmutableSettings.EMPTY);
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));


// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
Expand Down Expand Up @@ -283,7 +288,7 @@ public void run() {
logger.info("write to another type");
tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get();
tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get();
tribeClient.admin().indices().prepareRefresh().get();
assertNoFailures(tribeClient.admin().indices().prepareRefresh().get());


logger.info("verify they are there");
Expand All @@ -310,35 +315,88 @@ public void run() {

logger.info("delete an index, and make sure its reflected");
cluster2.client().admin().indices().prepareDelete("test2").get();
awaitIndicesNotInClusterState("test2");

try {
logger.info("stop a node, make sure its reflected");
cluster2.stopRandomDataNode();
awaitSameNodeCounts();
} finally {
cluster2.startNode();
awaitSameNodeCounts();
}
}

@Test
public void testCloseAndOpenIndex() throws Exception {
//create an index and close it even before starting the tribe node
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
ensureGreen(internalCluster());
assertAcked(internalCluster().client().admin().indices().prepareClose("test1"));

setupTribeNode(ImmutableSettings.EMPTY);
awaitSameNodeCounts();

//the closed index is not part of the tribe node cluster state
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false));

//open the index, it becomes part of the tribe node cluster state
assertAcked(internalCluster().client().admin().indices().prepareOpen("test1"));
awaitIndicesInClusterState("test1");
ensureGreen(internalCluster());

//create a second index, wait till it is seen from within the tribe node
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);

//close the second index, wait till it gets removed from the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareClose("test2"));
awaitIndicesNotInClusterState("test2");

//open the second index, wait till it gets added back to the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareOpen("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);
}

private void awaitIndicesInClusterState(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertTrue(tribeState.getMetaData().hasIndex("test1"));
assertFalse(tribeState.getMetaData().hasIndex("test2"));
assertTrue(tribeState.getRoutingTable().hasIndex("test1"));
assertFalse(tribeState.getRoutingTable().hasIndex("test2"));
for (String index : indices) {
assertTrue(tribeState.getMetaData().hasIndex(index));
assertTrue(tribeState.getRoutingTable().hasIndex(index));
}
}
});

logger.info("stop a node, make sure its reflected");
cluster2.stopRandomDataNode();
awaitSameNodeCounts();
}

private void awaitIndicesInClusterState(final String... indices) throws Exception {
private void awaitIndicesNotInClusterState(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertTrue(tribeState.getMetaData().hasIndex(index));
assertTrue(tribeState.getRoutingTable().hasIndex(index));
assertFalse(tribeState.getMetaData().hasIndex(index));
assertFalse(tribeState.getRoutingTable().hasIndex(index));
}
}
});
}

private void ensureGreen(TestCluster testCluster) {
ClusterHealthResponse actionGet = testCluster.client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

private void awaitSameNodeCounts() throws Exception {
assertBusy(new Runnable() {
@Override
Expand Down