Skip to content

Commit

Permalink
cluster block with auto create index bulk action can cause bulk execu…
Browse files Browse the repository at this point in the history
…tion to not return

when there is a cluster block (like no master yet discovered), the bulk action doesn't properly catch the exception of inner execute to notify the listener, causing the bulk operation to hang
closes #7086
  • Loading branch information
kimchy authored and areek committed Sep 8, 2014
1 parent 35631b4 commit 2294699
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public BulkRequest newRequestInstance(){
@Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

if (autoCreateIndex.needToCheck()) {
final Set<String> indices = Sets.newHashSet();
Expand Down Expand Up @@ -125,7 +125,7 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul
ClusterState state = clusterService.state();
for (final String index : indices) {
if (autoCreateIndex.shouldAutoCreate(index, state)) {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
Expand All @@ -145,7 +145,11 @@ public void onFailure(Throwable e) {
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener, responses);
try {
executeBulk(bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
}
}
}
});
Expand Down
49 changes: 36 additions & 13 deletions src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

package org.elasticsearch.cluster;

import com.google.common.base.Predicate;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
Expand All @@ -36,21 +37,24 @@
import java.util.HashMap;

import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.*;

/**
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class NoMasterNodeTests extends ElasticsearchIntegrationTest {

@Test
@TestLogging("action:TRACE,cluster.service:TRACE")
public void testNoMasterActions() throws Exception {
// note, sometimes, we want to check with the fact that an index gets created, sometimes not...
boolean autoCreateIndex = randomBoolean();
logger.info("auto_create_index set to {}", autoCreateIndex);

Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("action.auto_create_index", false)
.put("action.auto_create_index", autoCreateIndex)
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
Expand All @@ -75,14 +79,14 @@ public void run() {
try {
client().prepareGet("test", "type1", "1").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

try {
client().prepareMultiGet().add("test", "type1", "1").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

Expand All @@ -93,42 +97,61 @@ public void run() {
.setIndices("test").setDocumentType("type1")
.setSource(percolateSource).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

long now = System.currentTimeMillis();
try {
client().prepareUpdate("test", "type1", "1").setScript("test script", ScriptService.ScriptType.INLINE).setTimeout(timeout).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

try {
client().admin().indices().prepareAnalyze("test", "this is a test").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

try {
client().prepareCount("test").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

now = System.currentTimeMillis();
try {
client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

now = System.currentTimeMillis();
try {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.setTimeout(timeout);
bulkRequestBuilder.get();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
if (autoCreateIndex) {
// if its auto create index, the timeout will be based on the create index API
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
} else {
// TODO note, today we don't retry on global block for bulk operations-Dtests.seed=80C397728140167
assertThat(System.currentTimeMillis() - now, lessThan(50l));
}
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

internalCluster().startNode(settings);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
}
Expand Down

0 comments on commit 2294699

Please sign in to comment.