Skip to content

Commit

Permalink
No master (startup / minimum_master_node) / not recovered blocks shou…
Browse files Browse the repository at this point in the history
…ld cause proper failures on operations, closes #1589.
  • Loading branch information
kimchy committed Jan 6, 2012
1 parent 93cce59 commit ec8b7c3
Show file tree
Hide file tree
Showing 40 changed files with 627 additions and 151 deletions.
Expand Up @@ -20,14 +20,16 @@
package org.elasticsearch.action;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

/**
*
*/
public class UnavailableShardsException extends ElasticSearchException {

public UnavailableShardsException(ShardId shardId, String message) {
public UnavailableShardsException(@Nullable ShardId shardId, String message) {
super(buildMessage(shardId, message));
}

Expand All @@ -37,4 +39,9 @@ private static String buildMessage(ShardId shardId, String message) {
}
return "[" + shardId.index().name() + "][" + shardId.id() + "] " + message;
}

@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -65,10 +67,20 @@ protected BroadcastPingRequest newRequest() {
}

@Override
protected GroupShardsIterator shards(BroadcastPingRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, BroadcastPingRequest request, String[] concreteIndices) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), null, null);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, BroadcastPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, BroadcastPingRequest request, String[] concreteIndices) {
return null;
}

@Override
protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
Expand Down
Expand Up @@ -21,6 +21,9 @@

import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -85,4 +88,14 @@ protected GroupShardsIterator shards(IndexReplicationPingRequest indexRequest) {
protected ShardReplicationPingRequest newShardRequestInstance(IndexReplicationPingRequest indexRequest, int shardId) {
return new ShardReplicationPingRequest(indexRequest, shardId);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexReplicationPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexReplicationPingRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
}
Expand Up @@ -22,6 +22,9 @@
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -70,4 +73,14 @@ protected String transportAction() {
protected IndexReplicationPingRequest newIndexRequestInstance(ReplicationPingRequest request, String index, Set<String> routing) {
return new IndexReplicationPingRequest(request, index);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ReplicationPingRequest replicationPingRequest) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ReplicationPingRequest replicationPingRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, concreteIndices);
}
}
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -81,6 +83,16 @@ protected PrimaryResponse<ShardReplicationPingResponse, ShardReplicationPingRequ
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardReplicationPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ShardReplicationPingRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}

@Override
protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -51,7 +53,17 @@ protected String transportAction() {
}

@Override
protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException {
protected ClusterBlockException checkGlobalBlock(ClusterState state, SinglePingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, SinglePingRequest singlePingRequest) {
return null;
}

@Override
protected ShardIterator shards(ClusterState state, SinglePingRequest request) throws ElasticSearchException {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.index(), request.type, request.id, null, null);
}
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastStringReader;
Expand Down Expand Up @@ -87,13 +89,26 @@ protected String transportAction() {
}

@Override
protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) {
protected ClusterBlockException checkGlobalBlock(ClusterState state, AnalyzeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) {
request.index(state.metaData().concreteIndex(request.index()));
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
return null;
}

@Override
protected ShardsIterator shards(ClusterState state, AnalyzeRequest request) {
if (request.index() == null) {
// just execute locally....
return null;
}
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
}

@Override
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -160,7 +162,18 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClearIndicesCacheRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, ClearIndicesCacheRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ClearIndicesCacheRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ClearIndicesCacheRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}

}
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -123,7 +125,17 @@ protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws El
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(FlushRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, FlushRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, FlushRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -123,7 +125,18 @@ protected ShardGatewaySnapshotResponse shardOperation(ShardGatewaySnapshotReques
* The snapshot request works against all primary shards.
*/
@Override
protected GroupShardsIterator shards(GatewaySnapshotRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, GatewaySnapshotRequest request, String[] concreteIndices) {
return clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, GatewaySnapshotRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, GatewaySnapshotRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}

}
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -134,7 +136,17 @@ protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) thr
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(OptimizeRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, OptimizeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, OptimizeRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -143,7 +145,17 @@ protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throw
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(RefreshRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, RefreshRequest request, String[] concreteIndices) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RefreshRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RefreshRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -84,10 +86,20 @@ protected boolean ignoreNonActiveExceptions() {
* Segments goes across *all* active shards.
*/
@Override
protected GroupShardsIterator shards(IndicesSegmentsRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, IndicesSegmentsRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndicesSegmentsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndicesSegmentsRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}

@Override
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
Expand Down

0 comments on commit ec8b7c3

Please sign in to comment.