Skip to content

Commit

Permalink
Add missing cluster blocks handling for master operations
Browse files Browse the repository at this point in the history
Master node related operations were missing proper handling of cluster blocks, allowing for example to perform cluster level update settings even before the state was fully restored on initial cluster startup

Note, the change allows to change read only related settings without checking for blocks on update settings, as without it, it means one can't re-enable metadata/write. Also, it doesn't check for blocks on cluster state and health API, as those are allowed to be used even when blocked to figure out what causes the block.
closes #7763
closes #7740
  • Loading branch information
kimchy committed Sep 17, 2014
1 parent 2141f28 commit 4e87c58
Show file tree
Hide file tree
Showing 25 changed files with 295 additions and 121 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -59,6 +60,11 @@ protected String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
protected ClusterBlockException checkBlock(ClusterHealthRequest request, ClusterState state) {
return null; // we want users to be able to call this even when there are global blocks, just to check the health (are there blocks?)
}

@Override
protected ClusterHealthRequest newRequest() {
return new ClusterHealthRequest();
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -71,6 +73,11 @@ protected String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
protected ClusterBlockException checkBlock(NodesShutdownRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest();
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
Expand Down Expand Up @@ -54,6 +56,11 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(ClusterRerouteRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterRerouteRequest newRequest() {
return new ClusterRerouteRequest();
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -67,6 +69,17 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) {
// allow for dedicated changes to the metadata blocks, so we don't block those to allow to "re-enable" it
if ((request.transientSettings().getAsMap().isEmpty() && request.persistentSettings().getAsMap().size() == 1 && request.persistentSettings().get(MetaData.SETTING_READ_ONLY) != null) ||
request.persistentSettings().getAsMap().isEmpty() && request.transientSettings().getAsMap().size() == 1 && request.transientSettings().get(MetaData.SETTING_READ_ONLY) != null) {
return null;
}
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}


@Override
protected ClusterUpdateSettingsRequest newRequest() {
return new ClusterUpdateSettingsRequest();
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
Expand Down Expand Up @@ -54,6 +56,11 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected ClusterSearchShardsRequest newRequest() {
return new ClusterSearchShardsRequest();
Expand Down
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -66,6 +68,11 @@ protected String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
protected ClusterBlockException checkBlock(SnapshotsStatusRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected SnapshotsStatusRequest newRequest() {
return new SnapshotsStatusRequest();
Expand Down Expand Up @@ -105,22 +112,22 @@ protected void masterOperation(final SnapshotsStatusRequest request,

transportNodesSnapshotsStatus.status(nodesIds.toArray(new String[nodesIds.size()]),
snapshotIds, request.masterNodeTimeout(), new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), request.snapshots());
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
listener.onFailure(e);
}
}
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), request.snapshots());
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} else {
// We don't have any in-progress shards, just return current stats
listener.onResponse(buildResponse(request, currentSnapshots, null));
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -54,6 +56,15 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterState state) {
// cluster state calls are done also on a fully blocked cluster to figure out what is going
// on in the cluster. For example, which nodes have joined yet the recovery has not yet kicked
// in, we need to make sure we allow those calls
// return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
return null;
}

@Override
protected ClusterStateRequest newRequest() {
return new ClusterStateRequest();
Expand Down
Expand Up @@ -21,10 +21,13 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -48,6 +51,11 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(PendingClusterTasksRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected PendingClusterTasksRequest newRequest() {
return new PendingClusterTasksRequest();
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -45,6 +47,11 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(GetAliasesRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -48,6 +50,11 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(GetAliasesRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();
Expand All @@ -62,7 +69,7 @@ protected GetAliasesResponse newResponse() {
protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener<GetAliasesResponse> listener) throws ElasticsearchException {
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
@SuppressWarnings("unchecked") // ImmutableList to List results incompatible type
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices);
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices);
listener.onResponse(new GetAliasesResponse(result));
}

Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Strings;
Expand All @@ -44,10 +46,21 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex

@Inject
public TransportGetIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
}

@Override
protected String executor() {
// very lightweight operation, no need to fork
return ThreadPool.Names.SAME;
}

@Override
protected ClusterBlockException checkBlock(GetIndexRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected GetIndexRequest newRequest() {
return new GetIndexRequest();
Expand All @@ -60,7 +73,7 @@ protected GetIndexResponse newResponse() {

@Override
protected void doMasterOperation(final GetIndexRequest request, String[] concreteIndices, final ClusterState state,
final ActionListener<GetIndexResponse> listener) throws ElasticsearchException {
final ActionListener<GetIndexResponse> listener) throws ElasticsearchException {
ImmutableOpenMap<String, ImmutableList<Entry>> warmersResult = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappingsResult = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableList<AliasMetaData>> aliasesResult = ImmutableOpenMap.of();
Expand All @@ -72,40 +85,40 @@ protected void doMasterOperation(final GetIndexRequest request, String[] concret
boolean doneWarmers = false;
for (String feature : features) {
switch (feature) {
case "_warmer":
case "_warmers":
if (!doneWarmers) {
warmersResult = state.metaData().findWarmers(concreteIndices, request.types(), Strings.EMPTY_ARRAY);
doneWarmers = true;
}
break;
case "_mapping":
case "_mappings":
if (!doneMappings) {
mappingsResult = state.metaData().findMappings(concreteIndices, request.types());
doneMappings = true;
}
break;
case "_alias":
case "_aliases":
if (!doneAliases) {
aliasesResult = state.metaData().findAliases(Strings.EMPTY_ARRAY, concreteIndices);
doneAliases = true;
}
break;
case "_settings":
if (!doneSettings) {
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
for (String index : concreteIndices) {
settingsMapBuilder.put(index, state.metaData().index(index).getSettings());
case "_warmer":
case "_warmers":
if (!doneWarmers) {
warmersResult = state.metaData().findWarmers(concreteIndices, request.types(), Strings.EMPTY_ARRAY);
doneWarmers = true;
}
break;
case "_mapping":
case "_mappings":
if (!doneMappings) {
mappingsResult = state.metaData().findMappings(concreteIndices, request.types());
doneMappings = true;
}
break;
case "_alias":
case "_aliases":
if (!doneAliases) {
aliasesResult = state.metaData().findAliases(Strings.EMPTY_ARRAY, concreteIndices);
doneAliases = true;
}
break;
case "_settings":
if (!doneSettings) {
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
for (String index : concreteIndices) {
settingsMapBuilder.put(index, state.metaData().index(index).getSettings());
}
settings = settingsMapBuilder.build();
doneSettings = true;
}
settings = settingsMapBuilder.build();
doneSettings = true;
}
break;
break;

default:
throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid");
default:
throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid");
}
}
listener.onResponse(new GetIndexResponse(concreteIndices, warmersResult, mappingsResult, aliasesResult, settings));
Expand Down

0 comments on commit 4e87c58

Please sign in to comment.