Skip to content

Commit

Permalink
Fix correct 'pinning' of tasks to target node
Browse files Browse the repository at this point in the history
Due to the way the settings are used, the pinned node for a certain
task is being ignored and the list of discovered nodes is used
instead. This commit addresses that and separates the internal
properties used by the library to avoid overloading them (and thus
leading to errors).

Fix #256
  • Loading branch information
costin committed Sep 1, 2014
1 parent a86cf07 commit 8ba8fa6
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 175 deletions.
Expand Up @@ -29,14 +29,16 @@ public class EsEmbeddedServer {

private final Node node;

public EsEmbeddedServer(String clusterName, String dataPath, String httpRange, String transportRange) {
public EsEmbeddedServer(String clusterName, String dataPath, String httpRange, String transportRange, boolean hasSlave) {
Properties props = new Properties();
props.setProperty("path.data", dataPath);
props.setProperty("http.port", httpRange);
props.setProperty("transport.tcp.port", transportRange);
props.setProperty("es.index.store.type", "memory");
props.setProperty("gateway.type", "none");
props.setProperty("discovery.zen.ping.multicast.enabled", "false");
if (!hasSlave) {
props.setProperty("discovery.zen.ping.multicast.enabled", "false");
}
props.setProperty("script.disable_dynamic", "false");

Settings settings = ImmutableSettings.settingsBuilder().put(props).build();
Expand Down
6 changes: 3 additions & 3 deletions mr/src/itest/java/org/elasticsearch/hadoop/LocalEs.java
Expand Up @@ -39,7 +39,7 @@ public class LocalEs extends ExternalResource {
public static final String DATA_PORTS_SLAVE = "9700-9799";
public static final String TRANSPORT_PORTS_SLAVE = "9800-9899";

private boolean USE_SLAVE = false;
private boolean USE_SLAVE = true;
private boolean disabled = false;

@Override
Expand All @@ -63,13 +63,13 @@ protected void before() throws Throwable {

if (master == null) {
System.out.println("Starting Elasticsearch Master...");
master = new EsEmbeddedServer(CLUSTER_NAME, ES_DATA_PATH, DATA_PORTS, TRANSPORT_PORTS);
master = new EsEmbeddedServer(CLUSTER_NAME, ES_DATA_PATH, DATA_PORTS, TRANSPORT_PORTS, USE_SLAVE);
master.start();
}

if (USE_SLAVE && slave == null) {
System.out.println("Starting Elasticsearch Slave...");
slave = new EsEmbeddedServer(CLUSTER_NAME, ES_DATA_PATH, DATA_PORTS, TRANSPORT_PORTS);
slave = new EsEmbeddedServer(CLUSTER_NAME, ES_DATA_PATH, DATA_PORTS_SLAVE, TRANSPORT_PORTS_SLAVE, USE_SLAVE);
slave.start();
}
}
Expand Down
Expand Up @@ -24,7 +24,10 @@
public interface InternalConfigurationOptions extends ConfigurationOptions {

String INTERNAL_ES_TARGET_FIELDS = "es.internal.mr.target.fields";
// discovered hosts
String INTERNAL_ES_HOSTS = "es.internal.hosts";
// discovered node
String INTERNAL_ES_DISCOVERED_NODES = "es.internal.discovered.nodes";
// pinned node
String INTERNAL_ES_PINNED_NODE = "es.internal.pinned.node";

String INTERNAL_ES_VERSION = "es.internal.es.version";
}
20 changes: 10 additions & 10 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -276,11 +276,16 @@ public boolean getNetworkSocksUseSystemProperties() {
return Booleans.parseBoolean(getProperty(ES_NET_PROXY_SOCKS_USE_SYSTEM_PROPS, ES_NET_PROXY_SOCKS_USE_SYSTEM_PROPS_DEFAULT));
}

public Settings setHosts(String hosts) {
public Settings setNodes(String hosts) {
setProperty(ES_NODES, hosts);
return this;
}

@Deprecated
public Settings setHosts(String hosts) {
return setNodes(hosts);
}

public Settings setPort(int port) {
setProperty(ES_PORT, "" + port);
return this;
Expand Down Expand Up @@ -313,11 +318,6 @@ public String getResourceWrite() {
return getProperty(ES_RESOURCE_WRITE, getResource());
}

String getTargetHosts() {
String hosts = getProperty(INTERNAL_ES_HOSTS);
return (StringUtils.hasText(hosts) ? hosts : getNodes());
}

public String getQuery() {
return getProperty(ES_QUERY);
}
Expand Down Expand Up @@ -356,16 +356,16 @@ public Settings merge(Properties properties) {

return this;
}

public Settings merge(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return this;
}

for (Entry<String, String> entry : map.entrySet()) {
setProperty(entry.getKey(), entry.getValue());
}
setProperty(entry.getKey(), entry.getValue());
}

return this;
}

Expand Down
20 changes: 7 additions & 13 deletions mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -52,10 +52,6 @@
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

/**
Expand Down Expand Up @@ -195,21 +191,18 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable)
log.trace(String.format("Init shard reader w/ settings %s", esSplit.settings));
}

// override the global settings to communicate directly with the target node
settings.setHosts(esSplit.nodeIp).setPort(esSplit.httpPort);

this.esSplit = esSplit;

// initialize mapping/ scroll reader
InitializationUtils.setValueReaderIfNotSet(settings, WritableValueReader.class, log);

PartitionDefinition part = new PartitionDefinition(esSplit.nodeIp, esSplit.httpPort, esSplit.nodeName, esSplit.nodeId, esSplit.shardId, settings.save(), esSplit.mapping);
PartitionReader partitionReader = RestService.createReader(settings, part, log);

this.scrollReader = partitionReader.scrollReader;
this.client = partitionReader.client;
this.queryBuilder = partitionReader.queryBuilder;

// heart-beat
beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log);

Expand Down Expand Up @@ -412,11 +405,12 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
Settings settings = HadoopSettingsManager.loadFrom(job);
Collection<PartitionDefinition> partitions = RestService.findPartitions(settings, log);
ShardInputSplit[] splits = new ShardInputSplit[partitions.size()];

int index = 0;
for (PartitionDefinition part : partitions) {
splits[index++] = new ShardInputSplit(part.nodeIp, part.nodePort, part.nodeId, part.nodeName, part.shardId, part.serializedMapping, part.serializedSettings);
}
splits[index++] = new ShardInputSplit(part.nodeIp, part.nodePort, part.nodeId, part.nodeName, part.shardId,
part.serializedMapping, part.serializedSettings);
}
log.info(String.format("Created [%d] shard-splits", splits.length));
return splits;
}
Expand Down
Expand Up @@ -18,19 +18,15 @@
*/
package org.elasticsearch.hadoop.rest;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.serialization.BytesConverter;
import org.elasticsearch.hadoop.serialization.builder.ContentBuilder;
import org.elasticsearch.hadoop.serialization.builder.NoOpValueWriter;
Expand Down Expand Up @@ -63,13 +59,7 @@ public static boolean discoverNodesIfNeeded(Settings settings, Log log) {
log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
}

// clean-up and merge
Set<String> nodes = new LinkedHashSet<String>();
nodes.addAll(SettingsUtils.nodes(settings));
nodes.addAll(discoveredNodes);

// save result
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_HOSTS, StringUtils.concatenate(nodes, ","));
SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
bootstrap.close();

return true;
Expand Down
39 changes: 30 additions & 9 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/NetworkClient.java
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.hadoop.rest;

import java.io.Closeable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ByteSequence;
import org.elasticsearch.hadoop.util.SettingsUtils;


public class NetworkClient implements StatsAware, Closeable {
Expand All @@ -41,14 +43,28 @@ public class NetworkClient implements StatsAware, Closeable {
private final Map<String, Throwable> failedNodes = new LinkedHashMap<String, Throwable>();

private Transport currentTransport;
private String currentUri;
private String currentNode;
private int nextClient = 0;

private final Stats stats = new Stats();

public NetworkClient(Settings settings, List<String> hostURIs) {
public NetworkClient(Settings settings) {
this.settings = settings.copy();
this.nodes = hostURIs;
this.nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
// shuffle the list of nodes so in case of failures, the fallback is spread
Collections.shuffle(nodes);

if (SettingsUtils.hasPinnedNode(settings)) {
// move pinned node in front to be selected (only once)
String pinnedNode = SettingsUtils.getPinnedNode(settings);

if (log.isDebugEnabled()) {
log.debug("Opening (pinned) network client to " + pinnedNode);
}

nodes.remove(pinnedNode);
nodes.add(0, pinnedNode);
}

selectNextNode();

Expand All @@ -65,9 +81,9 @@ private boolean selectNextNode() {
}

closeTransport();
currentUri = nodes.get(nextClient++);
settings.setHosts(currentUri);
currentTransport = new CommonsHttpTransport(settings, currentUri);
currentNode = nodes.get(nextClient++);
SettingsUtils.pinNode(settings, currentNode);
currentTransport = new CommonsHttpTransport(settings, currentNode);
return true;
}

Expand All @@ -87,16 +103,21 @@ public Response execute(Request request) {
}
} catch (Exception ex) {
if (log.isTraceEnabled()) {
log.trace(String.format("Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentUri, request.path()), ex);
log.trace(
String.format(
"Caught exception while performing request [%s][%s] - falling back to the next node in line...",
currentNode, request.path()), ex);
}

String failed = currentUri;
String failed = currentNode;

failedNodes.put(failed, ex);

newNode = selectNextNode();

log.error(String.format("Node [%s] failed (%s); " + (newNode ? "selected next node [" + currentUri + "]" : "no other nodes left - aborting..."), ex.getMessage(), failed));
log.error(String.format("Node [%s] failed (%s); "
+ (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),
ex.getMessage(), failed));

if (!newNode) {
throw new EsHadoopNoNodesLeftException(failedNodes);
Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TrackingBytesArray;
import org.elasticsearch.hadoop.util.unit.TimeValue;
Expand Down Expand Up @@ -77,7 +76,7 @@ public enum HEALTH {
}

public RestClient(Settings settings) {
network = new NetworkClient(settings, SettingsUtils.nodes(settings));
network = new NetworkClient(settings);

scrollKeepAlive = TimeValue.timeValueMillis(settings.getScrollKeepAlive());
indexReadMissingAsEmpty = settings.getIndexReadMissingAsEmpty();
Expand Down

0 comments on commit 8ba8fa6

Please sign in to comment.