Skip to content

Commit

Permalink
internal custom allocation commands
Browse files Browse the repository at this point in the history
add support for internal custom allocation commands, including allocation, move, and cancel (shard).
also, fix #2242, which causes the cluster state to be in inconsistent state when a shard being the source of relocation is failed
  • Loading branch information
kimchy committed Sep 12, 2012
1 parent 378c3b0 commit ad23541
Show file tree
Hide file tree
Showing 19 changed files with 1,235 additions and 192 deletions.
Expand Up @@ -168,7 +168,7 @@ public void handleException(TransportException exp) {
});
t.start();
} else {
final String[] nodesIds = state.nodes().resolveNodes(request.nodesIds);
final String[] nodesIds = state.nodes().resolveNodesIds(request.nodesIds);
logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay);

for (String nodeId : nodesIds) {
Expand Down
Expand Up @@ -119,7 +119,7 @@ private AsyncAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
clusterState = clusterService.state();
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
String[] nodesIds = clusterState.nodes().resolveNodesIds(request.nodesIds());
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length);
}
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
Expand Down Expand Up @@ -177,7 +178,18 @@ public boolean isAllNodes(String... nodesIds) {
return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
}

public String[] resolveNodes(String... nodesIds) {
public DiscoveryNode resolveNode(String node) {
String[] resolvedNodeIds = resolveNodesIds(node);
if (resolvedNodeIds.length > 1) {
throw new ElasticSearchIllegalArgumentException("resolved [" + node + "] into [" + resolvedNodeIds.length + "] nodes, where expected to be resolved to a single node");
}
if (resolvedNodeIds.length == 0) {
throw new ElasticSearchIllegalArgumentException("failed to resolve [" + node + " ], no matching nodes");
}
return nodes.get(resolvedNodeIds[0]);
}

public String[] resolveNodesIds(String... nodesIds) {
if (isAllNodes(nodesIds)) {
int index = 0;
nodesIds = new String[nodes.size()];
Expand Down
Expand Up @@ -242,7 +242,8 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
// we check on instanceof so we also handle the MutableShardRouting case as well
if (o == null || !(o instanceof ImmutableShardRouting)) return false;

ImmutableShardRouting that = (ImmutableShardRouting) o;

Expand Down
Expand Up @@ -24,11 +24,9 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
Expand Down Expand Up @@ -57,6 +55,12 @@ public RoutingNodes(ClusterState clusterState) {
this.blocks = clusterState.blocks();
this.routingTable = clusterState.routingTable();
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
// fill in the nodeToShards with the "live" nodes
for (DiscoveryNode node : clusterState.nodes().dataNodes().values()) {
nodesToShards.put(node.id(), new ArrayList<MutableShardRouting>());
}

// fill in the inverse of node -> shards allocated
for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
for (ShardRouting shard : indexShard) {
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -71,6 +71,8 @@ public AllocationExplanation explanation() {

private Map<ShardId, String> ignoredShardToNodes = null;

private Map<ShardId, String> ignoreDisable = null;

public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.deciders = deciders;
this.routingNodes = routingNodes;
Expand Down Expand Up @@ -101,6 +103,17 @@ public AllocationExplanation explanation() {
return explanation;
}

public void addIgnoreDisable(ShardId shardId, String nodeId) {
if (ignoreDisable == null) {
ignoreDisable = new HashMap<ShardId, String>();
}
ignoreDisable.put(shardId, nodeId);
}

public boolean shouldIgnoreDisable(ShardId shardId, String nodeId) {
return ignoreDisable != null && nodeId.equals(ignoreDisable.get(shardId));
}

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
Expand Down
@@ -0,0 +1,86 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.command;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.index.shard.ShardId;

import java.util.Iterator;

/**
* Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
* allocation which might mean one will loose data if using local gateway..., use with care
* with the <tt>allowPrimary</tt> flag.
*/
public class AllocateAllocationCommand implements AllocationCommand {

private final ShardId shardId;
private final String nodeId;
private final boolean allowPrimary;

public AllocateAllocationCommand(ShardId shardId, String nodeId, boolean allowPrimary) {
this.shardId = shardId;
this.nodeId = nodeId;
this.allowPrimary = allowPrimary;
}

@Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode node = allocation.nodes().resolveNode(nodeId);

MutableShardRouting shardRouting = null;
for (MutableShardRouting routing : allocation.routingNodes().unassigned()) {
if (routing.shardId().equals(shardId)) {
// prefer primaries first to allocate
if (shardRouting == null || routing.primary()) {
shardRouting = routing;
}
}
}

if (shardRouting == null) {
throw new ElasticSearchIllegalArgumentException("[allocate] failed to find " + shardId + " on the list of unassigned shards");
}

if (shardRouting.primary() && !allowPrimary) {
throw new ElasticSearchIllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + "], which is disabled");
}

RoutingNode routingNode = allocation.routingNodes().node(node.id());
allocation.addIgnoreDisable(shardRouting.shardId(), routingNode.nodeId());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + node + " is not allowed");
}
// go over and remove it from the unassigned
for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {
if (it.next() != shardRouting) {
continue;
}
it.remove();
routingNode.add(shardRouting);
break;
}
}
}
@@ -0,0 +1,30 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.command;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;

/**
*/
public interface AllocationCommand {

void execute(RoutingAllocation allocation) throws ElasticSearchException;
}
@@ -0,0 +1,53 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.command;

import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;

import java.util.Arrays;
import java.util.List;

/**
*/
public class AllocationCommands {

private final List<AllocationCommand> commands = Lists.newArrayList();

public AllocationCommands(AllocationCommand... commands) {
if (commands != null) {
this.commands.addAll(Arrays.asList(commands));
}
}

public AllocationCommands add(AllocationCommand... commands) {
if (commands != null) {
this.commands.addAll(Arrays.asList(commands));
}
return this;
}

public void execute(RoutingAllocation allocation) throws ElasticSearchException {
for (AllocationCommand command : commands) {
command.execute(allocation);
}
}
}
@@ -0,0 +1,114 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.command;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.index.shard.ShardId;

import java.util.Iterator;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;

/**
* A command that cancel relocation, or recovery of a given shard on a node.
*/
public class CancelAllocationCommand implements AllocationCommand {

private final ShardId shardId;

private final String nodeId;

public CancelAllocationCommand(ShardId shardId, String node) {
this.shardId = shardId;
this.nodeId = node;
}

@Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode node = allocation.nodes().resolveNode(nodeId);

boolean found = false;
for (Iterator<MutableShardRouting> it = allocation.routingNodes().node(node.id()).iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (!shardRouting.shardId().equals(shardId)) {
continue;
}
found = true;
if (shardRouting.relocatingNodeId() != null) {
if (shardRouting.initializing()) {
// the shard is initializing and recovering from another node, simply cancel the recovery
it.remove();
shardRouting.deassignNode();
// and cancel the relocating state from the shard its being relocated from
RoutingNode relocatingFromNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (relocatingFromNode != null) {
for (MutableShardRouting fromShardRouting : relocatingFromNode) {
if (fromShardRouting.shardId().equals(shardRouting.shardId()) && shardRouting.state() == RELOCATING) {
fromShardRouting.cancelRelocation();
break;
}
}
}
} else if (shardRouting.relocating()) {
// the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one
if (shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + node + ", shard is primary and initializing its state");
}
it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
null, shardRouting.primary(), ShardRoutingState.UNASSIGNED, shardRouting.version() + 1));

// now, go and find the shard that is initializing on the target node, and cancel it as well...
RoutingNode initializingNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> itX = initializingNode.iterator(); itX.hasNext(); ) {
MutableShardRouting initializingShardRouting = itX.next();
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.state() == INITIALIZING) {
shardRouting.deassignNode();
itX.remove();
}
}
}
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
if (shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + node + ", shard is primary and initializing its state");
}
it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
null, shardRouting.primary(), ShardRoutingState.UNASSIGNED, shardRouting.version() + 1));
}
}

if (!found) {
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + ", failed to find it on node " + node);
}
}
}

0 comments on commit ad23541

Please sign in to comment.