Skip to content

Commit

Permalink
revise configuration object to simplify setting storage
Browse files Browse the repository at this point in the history
all settings are saved directly - if that's not desired, use #copy
relates to #155
  • Loading branch information
costin committed Apr 8, 2014
1 parent 510a125 commit a155897
Show file tree
Hide file tree
Showing 18 changed files with 115 additions and 72 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;

import cascading.tuple.Fields;
import cascading.tuple.hadoop.TupleSerializationProps;
Expand Down Expand Up @@ -121,4 +122,22 @@ static Properties extractOriginalProperties(Properties copy) {
throw new IllegalArgumentException("Cannot retrieve actual configuration", ex);
}
}

static void init(Settings settings, String nodes, int port, String resource, String query) {
if (StringUtils.hasText(nodes)) {
settings.setHosts(nodes);
}

if (port > 0) {
settings.setPort(port);
}

if (StringUtils.hasText(query)) {
settings.setQuery(query);
}

if (StringUtils.hasText(resource)) {
settings.setResource(resource);
}
}
}
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -60,14 +61,16 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje

private final String index;
private final String query;
private final String host;
private final String nodes;
private final int port;
private boolean IS_ES_10;

EsHadoopScheme(String host, int port, String index, String query, Fields fields) {
private static Log log = LogFactory.getLog(EsHadoopScheme.class);

EsHadoopScheme(String nodes, int port, String index, String query, Fields fields) {
this.index = index;
this.query = query;
this.host = host;
this.nodes = nodes;
this.port = port;
if (fields != null) {
setSinkFields(fields);
Expand Down Expand Up @@ -132,14 +135,15 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe

// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getTargetResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResource());
HadoopCfgUtils.setOutputCommitterClass(conf, EsOutputFormat.ESOldAPIOutputCommitter.class.getName());

}

private void initTargetUri(JobConf conf) {
// init
SettingsManager.loadFrom(conf).setHosts(host).setPort(port).setResource(index).setQuery(query).save();
CascadingUtils.init(SettingsManager.loadFrom(conf), nodes, port, index, query);
if (log.isTraceEnabled()) {
log.trace("Initialized configuration " + HadoopCfgUtils.asProperties(conf));
}
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -123,12 +123,12 @@ public void sinkConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, Sc

private void initClient(Properties props) {
if (client == null) {
Settings settings = SettingsManager.loadFrom(props).setHosts(host).setPort(port).setResource(resource).setQuery(query);
Settings settings = SettingsManager.loadFrom(props);
CascadingUtils.init(settings, host, port, resource, query);

InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LogFactory.getLog(EsTap.class));
settings.save();
client = new RestRepository(settings);
}
}
Expand Down
Expand Up @@ -71,7 +71,6 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
// will be closed by the query is finished
InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);
settings.save();

RestRepository client = new RestRepository(settings);
Field mapping = client.getMapping();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/cascading/EsTap.java
Expand Up @@ -64,6 +64,10 @@ public EsTap(String host, int port, String resource) {
this(host, port, resource, null, null);
}

public EsTap(String host, int port, String resource, Fields fields) {
this(host, port, resource, null, fields);
}

public EsTap(String host, int port, String resource, String query) {
this(host, port, resource, query, null);
}
Expand Down
Expand Up @@ -23,8 +23,8 @@
*/
public interface InternalConfigurationOptions extends ConfigurationOptions {

String INTERNAL_ES_TARGET_RESOURCE = "es.internal.mr.target.resource";
String INTERNAL_ES_TARGET_FIELDS = "es.internal.mr.target.fields";
// discovered hosts
String INTERNAL_ES_HOSTS = "es.internal.hosts";
String INTERNAL_ES_VERSION = "es.internal.es.version";
}
64 changes: 34 additions & 30 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -24,7 +24,6 @@
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.unit.Booleans;
import org.elasticsearch.hadoop.util.unit.ByteSizeValue;
Expand Down Expand Up @@ -201,17 +200,17 @@ public TimeValue getHeartBeatLead() {
}

public Settings setHosts(String hosts) {
this.targetHosts = hosts;
setProperty(ES_NODES, hosts);
return this;
}

public Settings setPort(int port) {
this.port = port;
setProperty(ES_PORT, "" + port);
return this;
}

public Settings setResource(String index) {
this.targetResource = index;
setProperty(ES_RESOURCE, index);
return this;
}

Expand All @@ -221,11 +220,16 @@ public Settings setQuery(String query) {
}

// aggregate the resource - computed / set / properties
public String getTargetResource() {
String resource = getProperty(INTERNAL_ES_TARGET_RESOURCE);
return (StringUtils.hasText(targetResource) ? targetResource : StringUtils.hasText(resource) ? resource : getProperty(ES_RESOURCE));
// public String getTargetResource() {
// String resource = getProperty(INTERNAL_ES_TARGET_RESOURCE);
// return (StringUtils.hasText(targetResource) ? targetResource : StringUtils.hasText(resource) ? resource : getProperty(ES_RESOURCE));
// }

public String getResource() {
return getProperty(ES_RESOURCE);
}


String getTargetHosts() {
String hosts = getProperty(INTERNAL_ES_HOSTS);
return StringUtils.hasText(targetHosts) ? targetHosts : (StringUtils.hasText(hosts) ? hosts : getNodes());
Expand All @@ -235,21 +239,21 @@ public String getQuery() {
return getProperty(ES_QUERY);
}

public Settings cleanHosts() {
setProperty(INTERNAL_ES_HOSTS, "");
return this;
}

public Settings cleanResource() {
setProperty(INTERNAL_ES_TARGET_RESOURCE, "");
return this;
}

public Settings clean() {
cleanResource();
cleanHosts();
return this;
}
// public Settings cleanHosts() {
// setProperty(INTERNAL_ES_HOSTS, "");
// return this;
// }
//
// public Settings cleanResource() {
// setProperty(INTERNAL_ES_TARGET_RESOURCE, "");
// return this;
// }
//
// public Settings clean() {
// cleanResource();
// cleanHosts();
// return this;
// }

public abstract InputStream loadResource(String location);

Expand All @@ -258,14 +262,14 @@ public Settings clean() {
/**
* Saves the settings state after validating them.
*/
public void save() {
String resource = getTargetResource();
String hosts = getTargetHosts();

Assert.hasText(resource, String.format("No resource (index/query/location) ['%s'] specified", ES_RESOURCE));
setProperty(INTERNAL_ES_TARGET_RESOURCE, resource);
setProperty(INTERNAL_ES_HOSTS, hosts);
}
// public void save() {
// String resource = getTargetResource();
// //String hosts = getTargetHosts();
//
// Assert.hasText(resource, String.format("No resource (index/query/location) ['%s'] specified", ES_RESOURCE));
// setProperty(INTERNAL_ES_TARGET_RESOURCE, resource);
// //setProperty(INTERNAL_ES_HOSTS, hosts);
// }

protected String getProperty(String name, String defaultValue) {
String value = getProperty(name);
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String

private void init(TableDesc tableDesc) {
Configuration cfg = getConf();
Settings settings = SettingsManager.loadFrom(cfg).merge(tableDesc.getProperties()).clean();
Settings settings = SettingsManager.loadFrom(cfg).merge(tableDesc.getProperties());

// NB: ESSerDe is already initialized at this stage but should still have a reference to the same cfg object
// NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes
Expand All @@ -104,8 +104,6 @@ private void init(TableDesc tableDesc) {

settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS,
StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
// save column names to apply projection
settings.save();

// replace the default committer when using the old API
HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName());
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -178,10 +178,11 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
}

void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) {
Settings settings = SettingsManager.loadFrom(cfg);
// get a copy to override the host/port
Settings settings = SettingsManager.loadFrom(cfg).copy();

// override the global settings to communicate directly with the target node
settings.cleanHosts().setHosts(esSplit.nodeIp).setPort(esSplit.httpPort);
settings.setHosts(esSplit.nodeIp).setPort(esSplit.httpPort);

this.esSplit = esSplit;

Expand Down Expand Up @@ -396,7 +397,6 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
Settings settings = SettingsManager.loadFrom(job);
InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);
settings.save();

RestRepository client = new RestRepository(settings);

Expand All @@ -406,13 +406,13 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit

if (!indexExists) {
if (settings.getIndexReadMissingAsEmpty()) {
log.info(String.format("Index [%s] missing - treating it as empty", settings.getTargetResource()));
log.info(String.format("Index [%s] missing - treating it as empty", settings.getResource()));
targetShards = Collections.emptyMap();
}
else {
client.close();
throw new IllegalArgumentException(
String.format("Index [%s] missing and settings [%s] is set to false", settings.getTargetResource(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL));
String.format("Index [%s] missing and settings [%s] is set to false", settings.getResource(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL));
}
}
else {
Expand All @@ -427,7 +427,7 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
Field mapping = client.getMapping();
//TODO: implement this more efficiently
savedMapping = IOUtils.serializeToBase64(mapping);
log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getTargetResource()));
log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getResource()));
}

client.close();
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java
Expand Up @@ -161,7 +161,7 @@ protected void init() throws IOException {
log.trace(String.format("ESRecordWriter instance [%s] initiating discovery of target shard...", currentInstance));
}

Settings settings = SettingsManager.loadFrom(cfg);
Settings settings = SettingsManager.loadFrom(cfg).copy();

InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, log);
Expand All @@ -177,7 +177,7 @@ protected void init() throws IOException {
beat.start();

client = new RestRepository(settings);
resource = settings.getTargetResource();
resource = settings.getResource();

// create the index if needed
if (client.touch()) {
Expand All @@ -202,7 +202,7 @@ protected void init() throws IOException {
Node targetNode = targetShards.get(chosenShard);

// override the global settings to communicate directly with the target node
settings.cleanHosts().setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());
settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());
client = new RestRepository(settings);
uri = SettingsUtils.nodes(settings).get(0);

Expand Down Expand Up @@ -287,7 +287,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf cfg) throws IOException
// NB: all changes to the config objects are discarded before the job is submitted if _the old MR api_ is used
private void init(Configuration cfg) throws IOException {
Settings settings = SettingsManager.loadFrom(cfg);
Assert.hasText(settings.getTargetResource(),
Assert.hasText(settings.getResource(),
String.format("No resource ['%s'] (index/query/location) specified", ES_RESOURCE));

// lazy-init
Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java
Expand Up @@ -156,7 +156,6 @@ private void init(String location, Job job) {
changed |= InitializationUtils.setFieldExtractorIfNotSet(settings, PigFieldExtractor.class, log);

IS_ES_10 = SettingsUtils.isEs10(settings);
settings.save();
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -102,7 +102,7 @@ public static void checkIndexExistence(Settings settings, RestRepository client)
if (!client.indexExists()) {
client.close();
throw new IllegalArgumentException(String.format("Target index [%s] does not exist and auto-creation is disabled [setting '%s' is '%s']",
settings.getTargetResource(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
settings.getResource(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
}
} catch (IOException ex) {
throw new IllegalStateException("Cannot check index existance", ex);
Expand Down Expand Up @@ -139,11 +139,11 @@ public static <T> void saveSchemaIfNeeded(Object conf, ValueWriter<T> schemaWrit
RestRepository client = new RestRepository(settings);
if (!client.indexExists()) {
if (schemaWriter == null) {
log.warn(String.format("No mapping found [%s] and no schema found; letting Elasticsearch perform auto-mapping...", settings.getTargetResource()));
log.warn(String.format("No mapping found [%s] and no schema found; letting Elasticsearch perform auto-mapping...", settings.getResource()));
}
else {
log.info(String.format("No mapping found [%s], creating one based on given schema",
settings.getTargetResource()));
settings.getResource()));
ContentBuilder builder = ContentBuilder.generate(schemaWriter).value(schema).flush();
BytesArray content = ((FastByteArrayOutputStream) builder.content()).bytes();
builder.close();
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/org/elasticsearch/hadoop/rest/NetworkClient.java
Expand Up @@ -65,8 +65,6 @@ private boolean selectNextNode() {
currentUri = nodes.get(nextClient++);
close();

//TODO: split host/port
settings.cleanHosts();
settings.setHosts(currentUri);
currentTransport = new CommonsHttpTransport(settings, currentUri);
return true;
Expand All @@ -90,14 +88,15 @@ public Response execute(Request request) throws IOException {
if (log.isTraceEnabled()) {
log.trace(String.format("Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentUri, request.path()), ex);
}

String failed = currentUri;
newNode = selectNextNode();

log.error(String.format("Node [%s] failed; " + (newNode ? "selected next node [" + currentUri + "]" : "no other nodes left - aborting..."), failed));

if (!newNode) {
throw new IOException("Out of nodes and retries; caught exception", ex);
}
if (log.isDebugEnabled()) {
log.debug(String.format("[%s] [%s] failed on node [%s]; selecting next node...",
request.method().name(), request.path(), currentUri));
}
}
} while (newNode);

Expand Down

0 comments on commit a155897

Please sign in to comment.