Skip to content

Commit

Permalink
Cluster State Update APIs (master node) to respect master_timeout better
Browse files Browse the repository at this point in the history
Currently, the master node might be processing too many cluster state events, and then be blocked on waiting for its respective even to be processed. We can use the new cluster state update timeout support to use the master_timeout value and respect it.

closes #3365
  • Loading branch information
kimchy committed Jul 22, 2013
1 parent e16f648 commit 860985f
Show file tree
Hide file tree
Showing 37 changed files with 295 additions and 49 deletions.
Expand Up @@ -23,12 +23,14 @@
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -76,7 +78,19 @@ protected ClusterRerouteResponse masterOperation(final ClusterRerouteRequest req
final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>();
final CountDownLatch latch = new CountDownLatch(1);

clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new TimeoutClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) {
try {
Expand All @@ -87,7 +101,7 @@ public ClusterState execute(ClusterState currentState) {
return currentState;
}
return newState;
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to reroute", e);
failureRef.set(e);
latch.countDown();
Expand Down
Expand Up @@ -24,9 +24,10 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
Expand All @@ -35,9 +36,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -92,7 +91,19 @@ protected ClusterUpdateSettingsResponse masterOperation(final ClusterUpdateSetti
final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder();
final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder();

clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) {
try {
Expand Down Expand Up @@ -149,7 +160,7 @@ public ClusterState execute(ClusterState currentState) {
}

return ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).build();
} catch (Exception e) {
} catch (Throwable e) {
latch.countDown();
logger.warn("failed to update cluster settings", e);
return currentState;
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request,
final AtomicReference<IndicesAliasesResponse> responseRef = new AtomicReference<IndicesAliasesResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()), new MetaDataIndexAliasesService.Listener() {
indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexAliasesService.Listener() {
@Override
public void onResponse(MetaDataIndexAliasesService.Response response) {
responseRef.set(new IndicesAliasesResponse(response.acknowledged()));
Expand Down
Expand Up @@ -86,7 +86,8 @@ protected CreateIndexResponse masterOperation(CreateIndexRequest request, Cluste
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.timeout(request.timeout()),
.timeout(request.timeout())
.masterTimeout(request.masterNodeTimeout()),
new MetaDataCreateIndexService.Listener() {
@Override
public void onResponse(MetaDataCreateIndexService.Response response) {
Expand Down
Expand Up @@ -123,7 +123,7 @@ protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, final
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(request.indices().length);
for (final String index : request.indices()) {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()), new MetaDataDeleteIndexService.Listener() {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
responseRef.set(new DeleteIndexResponse(response.acknowledged()));
Expand Down
Expand Up @@ -114,7 +114,7 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
latch.countDown();
Expand All @@ -130,7 +130,7 @@ public void onFailure(Throwable t) {

@Override
public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
latch.countDown();
Expand Down
Expand Up @@ -90,7 +90,7 @@ protected PutMappingResponse masterOperation(PutMappingRequest request, ClusterS
final AtomicReference<PutMappingResponse> responseRef = new AtomicReference<PutMappingResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()), new MetaDataMappingService.Listener() {
metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
responseRef.set(new PutMappingResponse(response.acknowledged()));
Expand Down
Expand Up @@ -71,7 +71,7 @@ protected UpdateSettingsResponse masterOperation(UpdateSettingsRequest request,
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

updateSettingsService.updateSettings(request.settings(), request.indices(), new MetaDataUpdateSettingsService.Listener() {
updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() {
@Override
public void onSuccess() {
latch.countDown();
Expand Down
Expand Up @@ -79,7 +79,7 @@ protected DeleteIndexTemplateResponse masterOperation(DeleteIndexTemplateRequest
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()), new MetaDataIndexTemplateService.RemoveListener() {
indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexTemplateService.RemoveListener() {
@Override
public void onResponse(MetaDataIndexTemplateService.RemoveResponse response) {
responseRef.set(new DeleteIndexTemplateResponse(response.acknowledged()));
Expand Down
Expand Up @@ -89,7 +89,8 @@ protected PutIndexTemplateResponse masterOperation(PutIndexTemplateRequest reque
.settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.create(request.create()),
.create(request.create())
.masterTimeout(request.masterNodeTimeout()),

new MetaDataIndexTemplateService.PutListener() {
@Override
Expand Down
Expand Up @@ -25,14 +25,16 @@
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmerMissingException;
Expand Down Expand Up @@ -91,7 +93,19 @@ protected DeleteWarmerResponse masterOperation(final DeleteWarmerRequest request
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) {
try {
Expand Down
Expand Up @@ -25,14 +25,16 @@
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
Expand Down Expand Up @@ -101,7 +103,19 @@ protected PutWarmerResponse masterOperation(final PutWarmerRequest request, Clus
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
Expand Down
Expand Up @@ -25,17 +25,17 @@
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask {
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {

/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onTimeout(String)}.
* {@link #onTimeout(TimeValue, String)}.
*/
TimeValue timeout();

/**
* Called when the cluster sate update task wasn't processed by the provided
* {@link #timeout()}.
*/
void onTimeout(String source);
void onTimeout(TimeValue timeout, String source);
}
Expand Up @@ -27,9 +27,10 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -135,7 +136,18 @@ public void createIndex(final Request request, final Listener userListener) {

final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener);

clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
return request.masterTimeout;
}

@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}

@Override
public ClusterState execute(ClusterState currentState) {
boolean indexCreated = false;
Expand Down Expand Up @@ -528,7 +540,6 @@ public static interface Listener {
public static class Request {

final String cause;

final String index;

State state = State.OPEN;
Expand All @@ -541,6 +552,7 @@ public static class Request {


TimeValue timeout = TimeValue.timeValueSeconds(5);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;

Set<ClusterBlock> blocks = Sets.newHashSet();

Expand Down Expand Up @@ -592,6 +604,11 @@ public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}

public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}

public static class Response {
Expand Down

0 comments on commit 860985f

Please sign in to comment.