Skip to content

Commit

Permalink
Improve node 'pinning' when dealing with client-only nodes
Browse files Browse the repository at this point in the history
relates #375
  • Loading branch information
costin committed Feb 13, 2015
1 parent b358ce9 commit 3eaa4e2
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 19 deletions.
Expand Up @@ -262,7 +262,7 @@ public Map<String, Node> getHttpNodes(boolean allowNonHttp) {

for (Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
Node node = new Node(entry.getKey(), entry.getValue());
if (allowNonHttp || node.hasHttp()) {
if (allowNonHttp || (node.hasHttp() && !node.isClient())) {
nodes.put(entry.getKey(), node);
}
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
Expand Down Expand Up @@ -287,6 +288,14 @@ protected Object[] doGetReadTargetShards(boolean clientNodesOnly) {
// if client-nodes routing is used, allow non-http clients
Map<String, Node> httpNodes = client.getHttpNodes(clientNodesOnly);

if (httpNodes.isEmpty()) {
String msg = "No HTTP-enabled data nodes found";
if (!settings.getNodesClientOnly()) {
msg += String.format("; if you are using client-only nodes make sure to configure es-hadoop as such through [%s] property", ConfigurationOptions.ES_NODES_CLIENT_ONLY);
}
new EsHadoopIllegalStateException(msg);
}

Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>();

boolean overlappingShards = false;
Expand Down
19 changes: 9 additions & 10 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java
Expand Up @@ -285,16 +285,12 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {

if (!SettingsUtils.hasPinnedNode(settings)) {
// pin node only if client-routing is disabled; otherwise simply go through them...
if (!settings.getNodesClientOnly()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]",
partition, partition.nodeId, partition.nodePort));
}

SettingsUtils.pinNode(settings, partition.nodeIp, partition.nodePort);
if (log.isDebugEnabled()) {
log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]", partition,
partition.nodeId, partition.nodePort));
}

SettingsUtils.pinNode(settings, partition.nodeIp, partition.nodePort);
}

ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
Expand All @@ -314,10 +310,12 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
RestRepository client = new RestRepository(settings);

if (settings.getNodesClientOnly()) {
String clientNode = client.getRestClient().getCurrentNode();
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition reader instance [%s] assigned to [%s]",
partition, client.getRestClient().getCurrentNode()));
partition, clientNode));
}
SettingsUtils.pinNode(settings, clientNode);
}

// take into account client node routing
Expand Down Expand Up @@ -408,9 +406,10 @@ private static RestRepository initSingleIndex(Settings settings, int currentInst

// if client-nodes are used, simply use the underlying nodes
if (settings.getNodesClientOnly()) {
String clientNode = repository.getRestClient().getCurrentNode();
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]",
currentInstance, repository.getRestClient().getCurrentNode()));
currentInstance, clientNode));
}

return repository;
Expand Down
Expand Up @@ -60,7 +60,7 @@ public static Map<Shard, Node> find(List<List<Map<String, Object>>> targetShards
Node node = httpNodes.get(shard.getNode());
if (node == null) {
log.warn(String.format("Cannot find node with id [%s] (is HTTP enabled?) from shard [%s] in nodes [%s]; layout [%s]", shard.getNode(), shard, httpNodes, targetShards));
return null;
return Collections.emptyMap();
}

// node -> shards
Expand Down
Expand Up @@ -36,6 +36,12 @@ public Node(String id, Map<String, Object> data) {
name = data.get("name").toString();
Object http = data.get("http_address");
hasHttp = (http != null);

attributes = (Map<String, Object>) data.get("attributes");
if (attributes != null) {
isClient = ("false".equals(attributes.get("data")) && "false".equals(attributes.get("master")));
}

if (!hasHttp) {
return;
}
Expand All @@ -45,11 +51,6 @@ public Node(String id, Map<String, Object> data) {
int endIndex = httpAddr.indexOf(":");
ipAddress = httpAddr.substring(startIndex, endIndex);
httpPort = Integer.valueOf(httpAddr.substring(endIndex + 1, httpAddr.indexOf("]")));

attributes = (Map<String, Object>) data.get("attributes");
if (attributes != null) {
isClient = ("false".equals(attributes.get("data")) && "false".equals(attributes.get("master")));
}
}

public boolean hasHttp() {
Expand Down
Expand Up @@ -49,7 +49,9 @@ public static void pinNode(Settings settings, String node) {
}

public static void pinNode(Settings settings, String node, int port) {
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_PINNED_NODE, qualifyNode(node, port));
if (StringUtils.hasText(node) && port > 0) {
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_PINNED_NODE, qualifyNode(node, port));
}
}

public static boolean hasPinnedNode(Settings settings) {
Expand Down
Expand Up @@ -33,7 +33,8 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](

override def getPreferredLocations(split: Partition): Seq[String] = {
val esSplit = split.asInstanceOf[EsPartition]
Seq(esSplit.esPartition.nodeIp)
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
}

override def checkpoint() {
Expand Down

0 comments on commit 3eaa4e2

Please sign in to comment.