Skip to content

Commit

Permalink
Split global resource into read/write targets
Browse files Browse the repository at this point in the history
Improve conf to allow for dedicated read and write resource as oppose to
a single, unified resource used for both. This allows for different ES
indices to be used in the same index, one as a source and the other as
a sink.

'es.resource' is still supported and used as a fall back.
Higher level abstractions, such as Cascading, Hive and Pig, set the
proper property automatically.

fix #156
fix #45
fix #26
  • Loading branch information
costin committed Mar 10, 2014
1 parent b9f05e6 commit 68cd50e
Show file tree
Hide file tree
Showing 18 changed files with 136 additions and 75 deletions.
Expand Up @@ -127,7 +127,7 @@ static Properties extractOriginalProperties(Properties copy) {
return ReflectionUtils.getField(field, copy);
}

static Settings init(Settings settings, String nodes, int port, String resource, String query) {
static Settings init(Settings settings, String nodes, int port, String resource, String query, boolean read) {
if (StringUtils.hasText(nodes)) {
settings.setHosts(nodes);
}
Expand All @@ -141,7 +141,12 @@ static Settings init(Settings settings, String nodes, int port, String resource,
}

if (StringUtils.hasText(resource)) {
settings.setResource(resource);
if (read) {
settings.setResourceRead(resource);
}
else {
settings.setResourceWrite(resource);
}
}

return settings;
Expand Down
Expand Up @@ -89,7 +89,7 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
context[0] = sourceCall.getInput().createKey();
context[1] = sourceCall.getInput().createValue();
// as the tuple _might_ vary (some objects might be missing), we use a map rather then a collection
Settings settings = loadSettings(flowProcess.getConfigCopy());
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
Expand All @@ -108,7 +108,7 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out

Object[] context = new Object[1];
// the tuple wil be fixed, so we can just use a collection/index
Settings settings = loadSettings(flowProcess.getConfigCopy());
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
Expand All @@ -122,10 +122,9 @@ public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out

@Override
public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {

initTargetUri(conf);
conf.setInputFormat(EsInputFormat.class);
Settings set = loadSettings(conf);
Settings set = loadSettings(conf, true);

Collection<String> fields = CascadingUtils.fieldToAlias(set, getSourceFields());
// load only the necessary fields
conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(fields, ","));
Expand All @@ -137,31 +136,27 @@ public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, Record

@Override
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
initTargetUri(conf);

conf.setOutputFormat(EsOutputFormat.class);
// define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
Settings set = loadSettings(conf);
Settings set = loadSettings(conf, false);

InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setBytesConverterIfNeeded(set, WritableBytesConverter.class, LogFactory.getLog(EsTap.class));

// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResourceWrite());
HadoopCfgUtils.setOutputCommitterClass(conf, EsOutputFormat.ESOldAPIOutputCommitter.class.getName());

if (log.isTraceEnabled()) {
log.trace("Initialized (sink) configuration " + HadoopCfgUtils.asProperties(conf));
}
}

private void initTargetUri(JobConf conf) {
CascadingUtils.init(SettingsManager.loadFrom(conf).merge(props), nodes, port, index, query);
}

private Settings loadSettings(Object source) {
return CascadingUtils.init(SettingsManager.loadFrom(source).merge(props), nodes, port, index, query);
private Settings loadSettings(Object source, boolean read) {
return CascadingUtils.init(SettingsManager.loadFrom(source).merge(props), nodes, port, index, query, read);
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -115,19 +115,19 @@ public void sinkPrepare(FlowProcess<Properties> flowProcess, SinkCall<Object[],

@Override
public void sourceConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, ScrollQuery, Object> tap, Properties conf) {
initClient(conf);
initClient(conf, true);
}

@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, ScrollQuery, Object> tap, Properties conf) {
initClient(conf);
initClient(conf, false);
InitializationUtils.checkIndexExistence(SettingsManager.loadFrom(conf).merge(props), client);
}

private void initClient(Properties props) {
private void initClient(Properties props, boolean read) {
if (client == null) {
Settings settings = SettingsManager.loadFrom(props).merge(this.props);
CascadingUtils.init(settings, host, port, resource, query);
CascadingUtils.init(settings, host, port, resource, query, read);

Log log = LogFactory.getLog(EsTap.class);
InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, log);
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -208,11 +208,6 @@ public Settings setPort(int port) {
return this;
}

public Settings setResource(String index) {
setProperty(ES_RESOURCE, index);
return this;
}

public Settings setResourceRead(String index) {
setProperty(ES_RESOURCE_READ, index);
return this;
Expand Down
30 changes: 18 additions & 12 deletions src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java
Expand Up @@ -76,25 +76,37 @@ public HiveMetaHook getMetaHook() {

@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
init(tableDesc);
init(tableDesc, true);
}

@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
init(tableDesc);
init(tableDesc, false);
}

private void init(TableDesc tableDesc) {
private void init(TableDesc tableDesc, boolean read) {
Configuration cfg = getConf();
Settings settings = SettingsManager.loadFrom(cfg).merge(tableDesc.getProperties());

// NB: ESSerDe is already initialized at this stage but should still have a reference to the same cfg object
// NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes

InitializationUtils.checkIdForOperation(settings);
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log);
if (read) {
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
// set read resource
settings.setResourceRead(settings.getResourceRead());
}
else {
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log);
// replace the default committer when using the old API
HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName());
// set write resource
settings.setResourceWrite(settings.getResourceWrite());
}

InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);
try {
InitializationUtils.discoverEsVersion(settings, log);
Expand All @@ -103,12 +115,6 @@ private void init(TableDesc tableDesc) {
}


settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS,
StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));

// replace the default committer when using the old API
HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName());

Assert.hasText(tableDesc.getProperties().getProperty(TABLE_LOCATION), String.format(
"no table location [%s] declared by Hive resulting in abnormal execution;", TABLE_LOCATION));
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -417,22 +417,22 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
String savedSettings = settings.save();

RestRepository client = new RestRepository(settings);
boolean indexExists = client.indexExists();
boolean indexExists = client.indexExists(true);
Map<Shard, Node> targetShards = null;

if (!indexExists) {
if (settings.getIndexReadMissingAsEmpty()) {
log.info(String.format("Index [%s] missing - treating it as empty", settings.getResource()));
log.info(String.format("Index [%s] missing - treating it as empty", settings.getResourceRead()));
targetShards = Collections.emptyMap();
}
else {
client.close();
throw new EsHadoopIllegalArgumentException(
String.format("Index [%s] missing and settings [%s] is set to false", settings.getResource(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL));
String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL));
}
}
else {
targetShards = client.getTargetShards();
targetShards = client.getReadTargetShards();
if (log.isTraceEnabled()) {
log.trace("Creating splits for shards " + targetShards);
}
Expand All @@ -443,7 +443,7 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
Field mapping = client.getMapping();
//TODO: implement this more efficiently
savedMapping = IOUtils.serializeToBase64(mapping);
log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getResource()));
log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getResourceRead()));
}

client.close();
Expand Down
Expand Up @@ -192,7 +192,7 @@ protected void init() throws IOException {
}
}

Map<Shard, Node> targetShards = client.getTargetPrimaryShards();
Map<Shard, Node> targetShards = client.getWriteTargetPrimaryShards();
client.close();

List<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java
Expand Up @@ -143,11 +143,14 @@ public void checkSchema(ResourceSchema s) throws IOException {

@Override
public void setStoreLocation(String location, Job job) throws IOException {
init(location, job);
init(location, job, false);
}

private void init(String location, Job job) {
Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties).setResource(location);
private void init(String location, Job job, boolean read) {
Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties);

settings = (read ? settings.setResourceRead(location) : settings.setResourceWrite(location));

boolean changed = false;
InitializationUtils.checkIdForOperation(settings);

Expand Down Expand Up @@ -222,7 +225,7 @@ public void storeSchema(ResourceSchema schema, String location, Job job) throws
//
@SuppressWarnings("unchecked")
public void setLocation(String location, Job job) throws IOException {
init(location, job);
init(location, job, true);

Configuration cfg = job.getConfiguration();

Expand Down
Expand Up @@ -80,6 +80,15 @@ public static boolean discoverNodesIfNeeded(Settings settings, Log log) throws I
}

public static String discoverEsVersion(Settings settings, Log log) throws IOException {
String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
if (StringUtils.hasText(version)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Elasticsearch version [%s] already present in configuration; skipping discovery", version));
}

return version;
}

RestClient bootstrap = new RestClient(settings);
// first get ES version
try {
Expand All @@ -101,10 +110,10 @@ public static void checkIndexExistence(Settings settings, RestRepository client)
client = new RestRepository(settings);
}
try {
if (!client.indexExists()) {
if (!client.indexExists(false)) {
client.close();
throw new EsHadoopIllegalArgumentException(String.format("Target index [%s] does not exist and auto-creation is disabled [setting '%s' is '%s']",
settings.getResource(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
}
} catch (IOException ex) {
throw new EsHadoopIllegalStateException("Cannot check index existance", ex);
Expand Down Expand Up @@ -139,13 +148,12 @@ public static <T> void saveSchemaIfNeeded(Object conf, ValueWriter<T> schemaWrit

if (settings.getIndexAutoCreate()) {
RestRepository client = new RestRepository(settings);
if (!client.indexExists()) {
if (!client.indexExists(false)) {
if (schemaWriter == null) {
log.warn(String.format("No mapping found [%s] and no schema found; letting Elasticsearch perform auto-mapping...", settings.getResourceWrite()));
}
else {
log.info(String.format("No mapping found [%s], creating one based on given schema",
settings.getResource()));
log.info(String.format("No mapping found [%s], creating one based on given schema", settings.getResourceWrite()));
ContentBuilder builder = ContentBuilder.generate(schemaWriter).value(schema).flush();
BytesArray content = ((FastByteArrayOutputStream) builder.content()).bytes();
builder.close();
Expand Down
Expand Up @@ -54,7 +54,7 @@ public class QueryBuilder {
private String fields;

QueryBuilder(Settings settings) {
this.resource = new Resource(settings);
this.resource = new Resource(settings, true);
IS_ES_10 = SettingsUtils.isEs10(settings);
String query = settings.getQuery();
if (!StringUtils.hasText(query)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/hadoop/rest/Resource.java
Expand Up @@ -64,7 +64,7 @@ public Resource(Settings settings, boolean read) {
Assert.isTrue(index >= 0 && index < resource.length() - 1, errorMessage);
resource = resource.substring(0, index);

settings.setResource(resource);
settings.setProperty(ConfigurationOptions.ES_RESOURCE, resource);
settings.setQuery(query);
}
}
Expand Down

0 comments on commit 68cd50e

Please sign in to comment.