Skip to content

Commit

Permalink
add per Tap configuration
Browse files Browse the repository at this point in the history
related to #146
  • Loading branch information
costin committed Apr 8, 2014
1 parent db4ccd1 commit b3a314d
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 20 deletions.
Expand Up @@ -205,7 +205,7 @@ static Tuple coerceToString(SinkCall<?, ?> sinkCall) {
return (CASCADING_22_AVAILABLE ? CoercibleOps.coerceToString(sinkCall) : LegacyOps.coerceToString(sinkCall));
}

public static Tap hadoopTap(String host, int port, String path, String query, Fields fields) {
return new EsHadoopTap(host, port, path, query, fields);
public static Tap hadoopTap(String host, int port, String path, String query, Fields fields, Properties props) {
return new EsHadoopTap(host, port, path, query, fields, props);
}
}
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -63,11 +64,12 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
private final String query;
private final String nodes;
private final int port;
private final Properties props;
private boolean IS_ES_10;

private static Log log = LogFactory.getLog(EsHadoopScheme.class);

EsHadoopScheme(String nodes, int port, String index, String query, Fields fields) {
EsHadoopScheme(String nodes, int port, String index, String query, Fields fields, Properties props) {
this.index = index;
this.query = query;
this.nodes = nodes;
Expand All @@ -76,6 +78,7 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
setSinkFields(fields);
setSourceFields(fields);
}
this.props = props;
}

@Override
Expand All @@ -86,7 +89,7 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
context[0] = sourceCall.getInput().createKey();
context[1] = sourceCall.getInput().createValue();
// as the tuple _might_ vary (some objects might be missing), we use a map rather then a collection
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy());
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
Expand All @@ -103,7 +106,7 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out

Object[] context = new Object[1];
// the tuple wil be fixed, so we can just use a collection/index
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy());
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
Expand All @@ -117,7 +120,7 @@ public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out
public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
initTargetUri(conf);
conf.setInputFormat(EsInputFormat.class);
Collection<String> fields = CascadingUtils.fieldToAlias(SettingsManager.loadFrom(flowProcess.getConfigCopy()), getSourceFields());
Collection<String> fields = CascadingUtils.fieldToAlias(SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props), getSourceFields());
// load only the necessary fields
conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(fields, ","));
}
Expand All @@ -127,7 +130,7 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe
initTargetUri(conf);
conf.setOutputFormat(EsOutputFormat.class);
// define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
Settings set = SettingsManager.loadFrom(conf);
Settings set = SettingsManager.loadFrom(conf).merge(props);

InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class));
Expand All @@ -140,7 +143,7 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe
}

private void initTargetUri(JobConf conf) {
CascadingUtils.init(SettingsManager.loadFrom(conf), nodes, port, index, query);
CascadingUtils.init(SettingsManager.loadFrom(conf).merge(props), nodes, port, index, query);
if (log.isTraceEnabled()) {
log.trace("Initialized configuration " + HadoopCfgUtils.asProperties(conf));
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.hadoop.cascading;

import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
Expand All @@ -43,9 +44,9 @@ class EsHadoopTap extends Tap<JobConf, RecordReader, OutputCollector> {

private final String target;

public EsHadoopTap(String host, int port, String index, String query, Fields fields) {
public EsHadoopTap(String host, int port, String index, String query, Fields fields, Properties props) {
this.target = index;
setScheme(new EsHadoopScheme(host, port, index, query, fields));
setScheme(new EsHadoopScheme(host, port, index, query, fields, props));
}

@Override
Expand Down
Expand Up @@ -56,11 +56,12 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
private final String query;
private final String host;
private final int port;
private final Properties props;
private transient RestRepository client;

private boolean IS_ES_10;

EsLocalScheme(String host, int port, String index, String query, Fields fields) {
EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
this.query = query;
this.host = host;
Expand All @@ -69,14 +70,15 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
setSinkFields(fields);
setSourceFields(fields);
}
this.props = props;
}

@Override
public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object[], ScrollQuery> sourceCall) throws IOException {
super.sourcePrepare(flowProcess, sourceCall);

Object[] context = new Object[1];
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy());
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
Expand Down Expand Up @@ -106,7 +108,7 @@ public void sinkPrepare(FlowProcess<Properties> flowProcess, SinkCall<Object[],
super.sinkPrepare(flowProcess, sinkCall);

Object[] context = new Object[1];
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy());
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
}
Expand All @@ -119,12 +121,12 @@ public void sourceConfInit(FlowProcess<Properties> flowProcess, Tap<Properties,
@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, ScrollQuery, Object> tap, Properties conf) {
initClient(conf);
InitializationUtils.checkIndexExistence(SettingsManager.loadFrom(conf), client);
InitializationUtils.checkIndexExistence(SettingsManager.loadFrom(conf).merge(props), client);
}

private void initClient(Properties props) {
if (client == null) {
Settings settings = SettingsManager.loadFrom(props);
Settings settings = SettingsManager.loadFrom(props).merge(this.props);
CascadingUtils.init(settings, host, port, resource, query);

Log log = LogFactory.getLog(EsTap.class);
Expand Down
Expand Up @@ -53,9 +53,9 @@ class EsLocalTap extends Tap<Properties, ScrollQuery, Object> {
private static Log log = LogFactory.getLog(EsLocalTap.class);
private final String target;

public EsLocalTap(String host, int port, String resource, String query, Fields fields) {
public EsLocalTap(String host, int port, String resource, String query, Fields fields, Properties props) {
this.target = resource;
setScheme(new EsLocalScheme(host, port, resource, query, fields));
setScheme(new EsLocalScheme(host, port, resource, query, fields, props));
}

@Override
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/cascading/EsTap.java
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.hadoop.cascading;

import java.io.IOException;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class EsTap extends Tap<Object, Object, Object> {
private Fields fields;
private String host;
private int port;
private Properties props;

// TODO: add defaults fall back
public EsTap(String resource) {
Expand Down Expand Up @@ -81,11 +83,16 @@ public EsTap(String resource, String query, Fields fields) {
}

public EsTap(String host, int port, String resource, String query, Fields fields) {
this(host, port, resource, query, fields, null);
}

public EsTap(String host, int port, String resource, String query, Fields fields, Properties tapSettings) {
this.resource = resource;
this.query = query;
this.host = host;
this.port = port;
this.fields = fields;
this.props = tapSettings;
}

@Override
Expand Down Expand Up @@ -181,7 +188,7 @@ private void initInnerTapIfNotSet(Object target, String hadoopTypeName) {
} catch (ClassNotFoundException e) {
runningInHadoop = false;
}
actualTap = (runningInHadoop ? new EsHadoopTap(host, port, resource, query, fields) : new EsLocalTap(host, port, resource, query, fields));
actualTap = (runningInHadoop ? new EsHadoopTap(host, port, resource, query, fields, props) : new EsLocalTap(host, port, resource, query, fields, props));
setScheme(actualTap.getScheme());
if (log.isDebugEnabled()) {
log.debug(String.format("Detected %s environment; initializing [%s]", (runningInHadoop ? "Hadoop" : "local"), actualTap.getClass().getSimpleName()));
Expand Down
Expand Up @@ -73,7 +73,7 @@ public Tap createTap(Scheme scheme, String path, SinkMode sinkMode, Properties p
int port = (StringUtils.hasText(portString) ? Integer.parseInt(portString) : -1);
String query = properties.getProperty("query");

return CascadingUtils.hadoopTap(host, port, path, query, ((EsScheme) scheme).fields);
return CascadingUtils.hadoopTap(host, port, path, query, ((EsScheme) scheme).fields, properties);
}

public Scheme createScheme(Fields fields, Properties properties) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -284,7 +284,7 @@ protected String getProperty(String name, String defaultValue) {
public abstract void setProperty(String name, String value);

public Settings merge(Properties properties) {
if (properties == null) {
if (properties == null || properties.isEmpty()) {
return this;
}

Expand Down

0 comments on commit b3a314d

Please sign in to comment.