Skip to content

Commit

Permalink
Cluster State Update APIs (master node) to respect master_timeout better
Browse files Browse the repository at this point in the history
Currently, the master node might be processing too many cluster state events, and then be blocked on waiting for its respective even to be processed. We can use the new cluster state update timeout support to use the master_timeout value and respect it.

closes #3365
  • Loading branch information
kimchy committed Jul 22, 2013
1 parent 860985f commit 677f126
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected CloseIndexResponse masterOperation(CloseIndexRequest request, ClusterS
final AtomicReference<CloseIndexResponse> responseRef = new AtomicReference<CloseIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.index()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() {
stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.index()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() {
@Override
public void onResponse(MetaDataStateIndexService.Response response) {
responseRef.set(new CloseIndexResponse(response.acknowledged()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected OpenIndexResponse masterOperation(OpenIndexRequest request, ClusterSta
final AtomicReference<OpenIndexResponse> responseRef = new AtomicReference<OpenIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.index()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() {
stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.index()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() {
@Override
public void onResponse(MetaDataStateIndexService.Response response) {
responseRef.set(new OpenIndexResponse(response.acknowledged()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -58,7 +59,17 @@ public MetaDataStateIndexService(Settings settings, ClusterService clusterServic
}

public void closeIndex(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("close-index [" + request.index + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("close-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}

@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}

@Override
public ClusterState execute(ClusterState currentState) {

Expand Down Expand Up @@ -101,7 +112,17 @@ public void clusterStateProcessed(ClusterState clusterState) {
}

public void openIndex(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("open-index [" + request.index + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("open-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}

@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}

@Override
public ClusterState execute(ClusterState currentState) {

Expand Down Expand Up @@ -154,6 +175,7 @@ public static class Request {
final String index;

TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;

public Request(String index) {
this.index = index;
Expand All @@ -163,6 +185,11 @@ public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}

public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}

public static class Response {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
OpenIndexRequest openIndexRequest = new OpenIndexRequest(request.param("index"));
openIndexRequest.listenerThreaded(false);
openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
client.admin().indices().open(openIndexRequest, new ActionListener<OpenIndexResponse>() {
@Override
public void onResponse(OpenIndexResponse response) {
Expand Down

0 comments on commit 677f126

Please sign in to comment.