Skip to content

Commit

Permalink
Tribe node: remove closed indices from cluster state
Browse files Browse the repository at this point in the history
At startup the tribe node ignores closed indices, but if you closed an index that was part of the tribe node cluster state, its state change was not currently handled. A NullPointerException could be seen in the logs instead as the routing table for the closed index was null. As a result, the index stayed in the tribe node cluster state in open state, although that didn't reflect reality. Also, subsequent cluster state updates happening in the tribe node kept failing, affecting updates related to any other index. The only way to recover from this was to restart the tribe node every time an index is closed on any tribe.

This commit properly handles index state changes, making sure that when an index gets closed it gets removed from the tribe node cluster state. Note that it makes little sense to keep the closed index around in the tribe node, as from the tribe node you can't do anything with it. The tribe node simply doesn't see any closed index, it's the same as if they didn't exist.

Closes #6411
Closes #9334
  • Loading branch information
javanna committed Jan 19, 2015
1 parent ff48c64 commit 6c2139c
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/tribe/TribeService.java
Expand Up @@ -255,7 +255,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
String markedTribeName = index.settings().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) {
if (tribeIndex == null || tribeIndex.state() == IndexMetaData.State.CLOSE) {
logger.info("[{}] removing index [{}]", tribeName, index.index());
removeIndex(blocks, metaData, routingTable, index);
} else {
Expand Down
104 changes: 81 additions & 23 deletions src/test/java/org/elasticsearch/tribe/TribeTests.java
Expand Up @@ -20,12 +20,15 @@
package org.elasticsearch.tribe;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -34,6 +37,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -44,6 +48,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -125,7 +130,7 @@ private void setupTribeNode(Settings settings) {
public void testGlobalReadWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));


setupTribeNode(ImmutableSettings.builder()
Expand Down Expand Up @@ -162,10 +167,10 @@ public void testGlobalReadWriteBlocks() throws Exception {
@Test
public void testIndexWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
internalCluster().client().admin().indices().prepareCreate("block_test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
cluster2.client().admin().indices().prepareCreate("block_test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2"));

setupTribeNode(ImmutableSettings.builder()
.put("tribe.blocks.write.indices", "block_*")
Expand Down Expand Up @@ -226,10 +231,10 @@ private void testOnConflictPrefer(String tribe) throws Exception {
logger.info("testing preference for tribe {}", tribe);

logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("conflict").get();
cluster2.client().admin().indices().prepareCreate("conflict").get();
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict"));
assertAcked(cluster2.client().admin().indices().prepareCreate("conflict"));
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));

setupTribeNode(ImmutableSettings.builder()
.put("tribe.on_conflict", "prefer_" + tribe)
Expand All @@ -249,8 +254,8 @@ private void testOnConflictPrefer(String tribe) throws Exception {
public void testTribeOnOneCluster() throws Exception {
setupTribeNode(ImmutableSettings.EMPTY);
logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));


// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
Expand Down Expand Up @@ -283,7 +288,7 @@ public void run() {
logger.info("write to another type");
tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get();
tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get();
tribeClient.admin().indices().prepareRefresh().get();
assertNoFailures(tribeClient.admin().indices().prepareRefresh().get());


logger.info("verify they are there");
Expand All @@ -310,35 +315,88 @@ public void run() {

logger.info("delete an index, and make sure its reflected");
cluster2.client().admin().indices().prepareDelete("test2").get();
awaitIndicesNotInClusterState("test2");

try {
logger.info("stop a node, make sure its reflected");
cluster2.stopRandomDataNode();
awaitSameNodeCounts();
} finally {
cluster2.startNode();
awaitSameNodeCounts();
}
}

@Test
public void testCloseAndOpenIndex() throws Exception {
//create an index and close it even before starting the tribe node
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
ensureGreen(internalCluster());
assertAcked(internalCluster().client().admin().indices().prepareClose("test1"));

setupTribeNode(ImmutableSettings.EMPTY);
awaitSameNodeCounts();

//the closed index is not part of the tribe node cluster state
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false));

//open the index, it becomes part of the tribe node cluster state
assertAcked(internalCluster().client().admin().indices().prepareOpen("test1"));
awaitIndicesInClusterState("test1");
ensureGreen(internalCluster());

//create a second index, wait till it is seen from within the tribe node
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);

//close the second index, wait till it gets removed from the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareClose("test2"));
awaitIndicesNotInClusterState("test2");

//open the second index, wait till it gets added back to the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareOpen("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);
}

private void awaitIndicesInClusterState(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertTrue(tribeState.getMetaData().hasIndex("test1"));
assertFalse(tribeState.getMetaData().hasIndex("test2"));
assertTrue(tribeState.getRoutingTable().hasIndex("test1"));
assertFalse(tribeState.getRoutingTable().hasIndex("test2"));
for (String index : indices) {
assertTrue(tribeState.getMetaData().hasIndex(index));
assertTrue(tribeState.getRoutingTable().hasIndex(index));
}
}
});

logger.info("stop a node, make sure its reflected");
cluster2.stopRandomDataNode();
awaitSameNodeCounts();
}

private void awaitIndicesInClusterState(final String... indices) throws Exception {
private void awaitIndicesNotInClusterState(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertTrue(tribeState.getMetaData().hasIndex(index));
assertTrue(tribeState.getRoutingTable().hasIndex(index));
assertFalse(tribeState.getMetaData().hasIndex(index));
assertFalse(tribeState.getRoutingTable().hasIndex(index));
}
}
});
}

private void ensureGreen(TestCluster testCluster) {
ClusterHealthResponse actionGet = testCluster.client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

private void awaitSameNodeCounts() throws Exception {
assertBusy(new Runnable() {
@Override
Expand Down

0 comments on commit 6c2139c

Please sign in to comment.