Skip to content

Commit

Permalink
Do not route MapReduce reads and writes through non-data nodes
Browse files Browse the repository at this point in the history
This commit adds a configuration option es.nodes.data.only for managing whether or not reads using EsInputFormat and writes using
EsOutputFormat in Hadoop MapReduce jobs route through non-data nodes. The reason for wanting to avoid routing through non-data nodes is
because these nodes are usually resource-constrainted relative to data nodes. This configuration option is enabled by default.

Closes #512
  • Loading branch information
jasontedor authored and costin committed Sep 1, 2015
1 parent 40c46a6 commit c23a79f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 1 deletion.
Expand Up @@ -61,6 +61,7 @@ static Settings addDefaultsToSettings(Properties flowProperties, Properties tapP

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, log);
Expand Down
4 changes: 4 additions & 0 deletions docs/src/reference/asciidoc/core/configuration.adoc
Expand Up @@ -301,6 +301,10 @@ Whether to discovery the nodes within the {es} cluster or only to use the ones g
`es.nodes.client.only` (default false)::
Whether to use {es} {ref}/modules-node.html[client nodes] (or _load-balancers_). When enabled, {eh} will route _all_ its requests (after nodes discovery, if enabled) through the _client_ nodes within the cluster. Note this typically significantly reduces the node parallelism and thus it is disabled by default.

`es.nodes.data.only` (default true)::
Whether to use {es} {ref}/modules-node.html[data nodes] only. When enabled, {eh} will route _all_ its requests (after nodes discovery, if enabled) through the _data_ nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.

added[2.2]
`es.http.timeout` (default 1m)::
Timeout for HTTP/REST connections to {es}.

Expand Down
Expand Up @@ -51,6 +51,10 @@ public interface ConfigurationOptions {
String ES_NODES_CLIENT_ONLY = "es.nodes.client.only";
String ES_NODES_CLIENT_ONLY_DEFAULT = "false";

/** Data only */
String ES_NODES_DATA_ONLY = "es.nodes.data.only";
String ES_NODES_DATA_ONLY_DEFAULT = "true";

/** Elasticsearch batch size given in bytes */
String ES_BATCH_SIZE_BYTES = "es.batch.size.bytes";
String ES_BATCH_SIZE_BYTES_DEFAULT = "1mb";
Expand Down
4 changes: 4 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -61,6 +61,10 @@ public boolean getNodesClientOnly() {
return Booleans.parseBoolean(getProperty(ES_NODES_CLIENT_ONLY, ES_NODES_CLIENT_ONLY_DEFAULT));
}

public boolean getNodesDataOnly() {
return Booleans.parseBoolean(getProperty(ES_NODES_DATA_ONLY, ES_NODES_DATA_ONLY_DEFAULT));
}

public long getHttpTimeout() {
return TimeValue.parseTimeValue(getProperty(ES_HTTP_TIMEOUT, ES_HTTP_TIMEOUT_DEFAULT)).getMillis();
}
Expand Down
Expand Up @@ -109,6 +109,45 @@ public static void filterNonClientNodesIfNeeded(Settings settings, Log log) {
}
}

public static void filterNonDataNodesIfNeeded(Settings settings, Log log) {
if (!settings.getNodesDataOnly()) {
return;
}

RestClient bootstrap = new RestClient(settings);
try {
List<String> dataNodes = bootstrap.getHttpDataNodes();
if (dataNodes.isEmpty()) {
throw new EsHadoopIllegalArgumentException("Data node only routing specified but no data nodes with HTTP-enabled were found in the cluster...");
}
if (log.isDebugEnabled()) {
log.debug(String.format("Found data nodes %s", dataNodes));
}

List<String> ddNodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
// remove non-data nodes
ddNodes.retainAll(dataNodes);
if (log.isDebugEnabled()) {
log.debug(String.format("Filtered discovered only nodes %s to data-only %s", SettingsUtils.discoveredOrDeclaredNodes(settings), ddNodes));
}

if (ddNodes.isEmpty()) {
String message = "Data node only routing specified but no data nodes with HTTP-enabled available; ";
if (settings.getNodesDiscovery()) {
message += String.format("looks like the data nodes discovered have been removed; is the cluster in a stable state? %s", dataNodes);
}
else {
message += String.format("node discovery is disabled and none of nodes specified fits the criterion %s", SettingsUtils.discoveredOrDeclaredNodes(settings));
}
throw new EsHadoopIllegalArgumentException(message);
}

SettingsUtils.setDiscoveredNodes(settings, dataNodes);
} finally {
bootstrap.close();
}
}

public static String discoverEsVersion(Settings settings, Log log) {
String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
if (StringUtils.hasText(version)) {
Expand Down
15 changes: 14 additions & 1 deletion mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Expand Up @@ -295,6 +295,19 @@ public List<String> getHttpClientNodes() {
return nodes;
}

public List<String> getHttpDataNodes() {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
List<String> nodes = new ArrayList<String>();

for (Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
Node node = new Node(entry.getKey(), entry.getValue());
if (node.isData() && node.hasHttp()) {
nodes.add(node.getInet());
}
}
return nodes;
}

@SuppressWarnings("unchecked")
public Map<String, Object> getMapping(String query) {
return (Map<String, Object>) get(query, null);
Expand Down Expand Up @@ -460,4 +473,4 @@ private void countStreamStats(InputStream content) {
public String getCurrentNode() {
return network.currentNode();
}
}
}
Expand Up @@ -228,6 +228,7 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

String savedSettings = settings.save();
Expand Down Expand Up @@ -373,6 +374,7 @@ public static PartitionWriter createWriter(Settings settings, int currentSplit,

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
Expand Down
Expand Up @@ -30,6 +30,7 @@ public class Node implements Serializable {
private int httpPort;
private Map<String, Object> attributes;
private boolean isClient = false;
private boolean isData = true;

public Node(String id, Map<String, Object> data) {
this.id = id;
Expand All @@ -42,6 +43,10 @@ public Node(String id, Map<String, Object> data) {
isClient = ("false".equals(attributes.get("data")) && "false".equals(attributes.get("master")));
}

if (attributes != null) {
isData = !"false".equals((attributes.get("data")));
}

if (!hasHttp) {
return;
}
Expand All @@ -65,6 +70,10 @@ public boolean isClient() {
return isClient;
}

public boolean isData() {
return isData;
}

public String getId() {
return id;
}
Expand Down

0 comments on commit c23a79f

Please sign in to comment.