Skip to content

Commit

Permalink
Improve filtering of nodes when using client-only routing
Browse files Browse the repository at this point in the history
relates #375 #373
  • Loading branch information
costin committed Feb 10, 2015
1 parent 602e7a2 commit b358ce9
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 47 deletions.
Expand Up @@ -65,7 +65,7 @@ public void stop() throws Exception {

@Test
public void testShardInfo() throws Exception {
Map<Shard, Node> shards = (Map<Shard, Node>) client.getReadTargetShards()[1];
Map<Shard, Node> shards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];
System.out.println(shards);
assertNotNull(shards);
}
Expand All @@ -90,7 +90,7 @@ public void testQueryBuilder() throws Exception {

@Test
public void testQueryShards() throws Exception {
Map<Shard, Node> targetShards = (Map<Shard, Node>) client.getReadTargetShards()[1];
Map<Shard, Node> targetShards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];

Field mapping = client.getMapping();
ScrollReader reader = new ScrollReader(new JdkValueReader(), mapping, true, "_metadata", false);
Expand Down
Expand Up @@ -77,9 +77,9 @@ public static void filterNonClientNodesIfNeeded(Settings settings, Log log) {

RestClient bootstrap = new RestClient(settings);
try {
List<String> clientNodes = bootstrap.getClientNodes();
List<String> clientNodes = bootstrap.getHttpClientNodes();
if (clientNodes.isEmpty()) {
throw new EsHadoopIllegalArgumentException("Client-only routing specified but not client nodes were found in the cluster...");
throw new EsHadoopIllegalArgumentException("Client-only routing specified but no client nodes with HTTP-enabled were found in the cluster...");
}
if (log.isDebugEnabled()) {
log.debug(String.format("Found client nodes %s", clientNodes));
Expand All @@ -91,6 +91,18 @@ public static void filterNonClientNodesIfNeeded(Settings settings, Log log) {
if (log.isDebugEnabled()) {
log.debug(String.format("Filtered discovered only nodes %s to client-only %s", SettingsUtils.discoveredOrDeclaredNodes(settings), ddNodes));
}

if (ddNodes.isEmpty()) {
String message = "Client-only routing specified but no client nodes with HTTP-enabled available; ";
if (settings.getNodesDiscovery()) {
message += String.format("looks like the client nodes discovered have been removed; is the cluster in a stable state? %s", clientNodes);
}
else {
message += String.format("node discovery is disabled and none of nodes specified fits the criterion %s", SettingsUtils.discoveredOrDeclaredNodes(settings));
}
throw new EsHadoopIllegalArgumentException(message);
}

SettingsUtils.setDiscoveredNodes(settings, ddNodes);
} finally {
bootstrap.close();
Expand Down
Expand Up @@ -256,20 +256,20 @@ public List<List<Map<String, Object>>> targetShards(String index) {
return shardsJson;
}

public Map<String, Node> getNodes() {
public Map<String, Node> getHttpNodes(boolean allowNonHttp) {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
Map<String, Node> nodes = new LinkedHashMap<String, Node>();

for (Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
Node node = new Node(entry.getKey(), entry.getValue());
if (node.hasHttp()) {
if (allowNonHttp || node.hasHttp()) {
nodes.put(entry.getKey(), node);
}
}
return nodes;
}

public List<String> getClientNodes() {
public List<String> getHttpClientNodes() {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
List<String> nodes = new ArrayList<String>();

Expand Down
30 changes: 16 additions & 14 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java
Expand Up @@ -271,19 +271,21 @@ public RestClient getRestClient() {
}


public Object[] getReadTargetShards() {
public Object[] getReadTargetShards(boolean clientNodesOnly) {
for (int retries = 0; retries < 3; retries++) {
Object[] result = doGetReadTargetShards();
Object[] result = doGetReadTargetShards(clientNodesOnly);
if (result != null) {
return result;
}
}
throw new EsHadoopIllegalStateException("Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable");
}

protected Object[] doGetReadTargetShards() {
protected Object[] doGetReadTargetShards(boolean clientNodesOnly) {
List<List<Map<String, Object>>> info = client.targetShards(resourceR.index());
Map<String, Node> nodes = client.getNodes();

// if client-nodes routing is used, allow non-http clients
Map<String, Node> httpNodes = client.getHttpNodes(clientNodesOnly);

Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>();

Expand All @@ -299,7 +301,7 @@ protected Object[] doGetReadTargetShards() {
if (!isReadIndexConcrete()) {
String message = String.format("Read resource [%s] includes multiple indices or/and aliases; to avoid duplicate results (caused by shard overlapping), parallelism ", resourceR);

Map<Shard, Node> combination = ShardSorter.find(info, nodes);
Map<Shard, Node> combination = ShardSorter.find(info, httpNodes, log);
if (combination.isEmpty()) {
message += "is minimized";
log.warn(message);
Expand All @@ -316,8 +318,8 @@ protected Object[] doGetReadTargetShards() {
message += String.format("is reduced from %s to %s", initialParallelism, combination.size());
log.warn(message);
}
result[0] = overlappingShards;
result[1] = combination;
result[0] = overlappingShards;
result[1] = combination;

return result;
}
Expand All @@ -329,9 +331,9 @@ protected Object[] doGetReadTargetShards() {
for (Map<String, Object> shardData : shardGroup) {
Shard shard = new Shard(shardData);
if (shard.getState().isStarted()) {
Node node = nodes.get(shard.getNode());
Node node = httpNodes.get(shard.getNode());
if (node == null) {
log.warn(String.format("Cannot find node with id [%s] (is HTTP enabled?) from shard [%s] in nodes [%s]; layout [%s]", shard.getNode(), shard, nodes, info));
log.warn(String.format("Cannot find node with id [%s] (is HTTP enabled?) from shard [%s] in nodes [%s]; layout [%s]", shard.getNode(), shard, httpNodes, info));
return null;
}
// when dealing with overlapping shards, simply keep a shard for each id/name (0, 1, ...)
Expand All @@ -342,28 +344,28 @@ protected Object[] doGetReadTargetShards() {
}
else {
shards.put(shard, node);
break;
break;
}
}
}
}
return result;
}

public Map<Shard, Node> getWriteTargetPrimaryShards() {
public Map<Shard, Node> getWriteTargetPrimaryShards(boolean clientNodesOnly) {
for (int retries = 0; retries < 3; retries++) {
Map<Shard, Node> map = doGetWriteTargetPrimaryShards();
Map<Shard, Node> map = doGetWriteTargetPrimaryShards(clientNodesOnly);
if (map != null) {
return map;
}
}
throw new EsHadoopIllegalStateException("Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable");
}

protected Map<Shard, Node> doGetWriteTargetPrimaryShards() {
protected Map<Shard, Node> doGetWriteTargetPrimaryShards(boolean clientNodesOnly) {
List<List<Map<String, Object>>> info = client.targetShards(resourceW.index());
Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>();
Map<String, Node> nodes = client.getNodes();
Map<String, Node> nodes = client.getHttpNodes(clientNodesOnly);

for (List<Map<String, Object>> shardGroup : info) {
// consider only primary shards
Expand Down
Expand Up @@ -245,7 +245,7 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
}
}
else {
Object[] result = client.getReadTargetShards();
Object[] result = client.getReadTargetShards(settings.getNodesClientOnly());
overlappingShards = (Boolean) result[0];
targetShards = (Map<Shard, Node>) result[1];

Expand Down Expand Up @@ -406,7 +406,7 @@ private static RestRepository initSingleIndex(Settings settings, int currentInst
}
}

// if client-nodes are used, simply use the underlying
// if client-nodes are used, simply use the underlying nodes
if (settings.getNodesClientOnly()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]",
Expand All @@ -419,7 +419,7 @@ private static RestRepository initSingleIndex(Settings settings, int currentInst
// no routing necessary; select the relevant target shard/node
Map<Shard, Node> targetShards = Collections.emptyMap();

targetShards = repository.getWriteTargetPrimaryShards();
targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
repository.close();

Assert.isTrue(!targetShards.isEmpty(),
Expand Down
61 changes: 39 additions & 22 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/ShardSorter.java
Expand Up @@ -18,8 +18,21 @@
*/
package org.elasticsearch.hadoop.rest;

import java.util.*;

import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.util.Assert;
Expand All @@ -34,7 +47,7 @@

abstract class ShardSorter {

public static Map<Shard, Node> find(List<List<Map<String, Object>>> targetShards, Map<String, Node> nodes) {
public static Map<Shard, Node> find(List<List<Map<String, Object>>> targetShards, Map<String, Node> httpNodes, Log log) {
// group the shards per node
Map<Node, Set<Shard>> shardsPerNode = new LinkedHashMap<Node, Set<Shard>>();
// nodes for each shard
Expand All @@ -44,7 +57,11 @@ public static Map<Shard, Node> find(List<List<Map<String, Object>>> targetShards
for (List<Map<String, Object>> shardGroup : targetShards) {
for (Map<String, Object> shardData : shardGroup) {
Shard shard = new Shard(shardData);
Node node = nodes.get(shard.getNode());
Node node = httpNodes.get(shard.getNode());
if (node == null) {
log.warn(String.format("Cannot find node with id [%s] (is HTTP enabled?) from shard [%s] in nodes [%s]; layout [%s]", shard.getNode(), shard, httpNodes, targetShards));
return null;
}

// node -> shards
Set<Shard> shardSet = shardsPerNode.get(node);
Expand All @@ -65,7 +82,7 @@ public static Map<Shard, Node> find(List<List<Map<String, Object>>> targetShards
}
}

return checkCombo(nodes.values(), shardsPerNode, targetShards.size());
return checkCombo(httpNodes.values(), shardsPerNode, targetShards.size());
}

private static Map<Shard, Node> checkCombo(Collection<Node> nodes, Map<Node, Set<Shard>> shardsPerNode, int numberOfShards) {
Expand All @@ -80,16 +97,16 @@ private static Map<Shard, Node> checkCombo(Collection<Node> nodes, Map<Node, Set

for (Node node : set) {
Set<Shard> associatedShards = shardsPerNode.get(node);
if (associatedShards != null) {
for (Shard shard : associatedShards) {
if (!shards.add(SimpleShard.from(shard))) {
overlappingShards = true;
break;
}
}
if (overlappingShards) {
break;
}
if (associatedShards != null) {
for (Shard shard : associatedShards) {
if (!shards.add(SimpleShard.from(shard))) {
overlappingShards = true;
break;
}
}
if (overlappingShards) {
break;
}
}
}
// bingo!
Expand All @@ -98,13 +115,13 @@ private static Map<Shard, Node> checkCombo(Collection<Node> nodes, Map<Node, Set
for (Node node : set) {
Set<Shard> associatedShards = shardsPerNode.get(node);
if (associatedShards != null) {
// to avoid shard overlapping, only add one request for each shard # (regardless of its index) per node
Set<Integer> shardIds = new HashSet<Integer>();
for (Shard potentialShard : associatedShards) {
if (shardIds.add(potentialShard.getName())) {
finalShards.put(potentialShard, node);
}
}
// to avoid shard overlapping, only add one request for each shard # (regardless of its index) per node
Set<Integer> shardIds = new HashSet<Integer>();
for (Shard potentialShard : associatedShards) {
if (shardIds.add(potentialShard.getName())) {
finalShards.put(potentialShard, node);
}
}
}
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Map.Entry;
import java.util.Set;

import org.apache.commons.logging.LogFactory;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.hadoop.serialization.dto.Node;
Expand Down Expand Up @@ -118,7 +119,7 @@ private Map<String, String> topology(Map<String, List<String>> shardsPerNode) {
List<List<Map<String, Object>>> targetShards = new ArrayList<List<Map<String, Object>>>();
targetShards.addAll(shardGroups.values());

Map<Shard, Node> find = ShardSorter.find(targetShards, nodes);
Map<Shard, Node> find = ShardSorter.find(targetShards, nodes, LogFactory.getLog(ShardSorting.class));
if (find.isEmpty()) {
return Collections.emptyMap();
}
Expand Down

0 comments on commit b358ce9

Please sign in to comment.