Skip to content

Commit

Permalink
Add dynamic index writes based on data fields
Browse files Browse the repository at this point in the history
  • Loading branch information
costin committed Mar 25, 2014
1 parent 9a01cd4 commit 90e0527
Show file tree
Hide file tree
Showing 18 changed files with 352 additions and 27 deletions.
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.hadoop.cfg;

import org.elasticsearch.hadoop.serialization.DefaultIndexFormat;

/**
* Class providing the various Configuration parameters used by the Elasticsearch Hadoop integration.
*/
Expand Down Expand Up @@ -130,6 +132,8 @@ public interface ConfigurationOptions {
String ES_MAPPING_TTL_EXTRACTOR_CLASS = "es.mapping.ttl.extractor.class";
String ES_MAPPING_TIMESTAMP = "es.mapping.timestamp";
String ES_MAPPING_TIMESTAMP_EXTRACTOR_CLASS = "es.mapping.timestamp.extractor.class";
String ES_MAPPING_INDEX_FORMAT_CLASS = "es.mapping.index.format.class";
String ES_MAPPING_DEFAULT_INDEX_FORMAT_CLASS = DefaultIndexFormat.class.getName();

/** Operation types */
String ES_WRITE_OPERATION = "es.write.operation";
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -183,6 +183,10 @@ public String getMappingTimestampExtractorClassName() {
return getProperty(ES_MAPPING_TIMESTAMP_EXTRACTOR_CLASS, getMappingDefaultClassExtractor());
}

public String getMappingIndexFormatClassName() {
return getProperty(ES_MAPPING_INDEX_FORMAT_CLASS, ES_MAPPING_DEFAULT_INDEX_FORMAT_CLASS);
}

public boolean getUpsertDoc() {
return Booleans.parseBoolean(getProperty(ES_UPSERT_DOC, ES_UPSERT_DOC_DEFAULT));
}
Expand Down Expand Up @@ -271,6 +275,7 @@ public String getResourceRead() {
public String getResourceWrite() {
return getProperty(ES_RESOURCE_WRITE, getResource());
}

String getTargetHosts() {
String hosts = getProperty(INTERNAL_ES_HOSTS);
return (StringUtils.hasText(hosts) ? hosts : getNodes());
Expand Down
45 changes: 40 additions & 5 deletions src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java
Expand Up @@ -44,8 +44,10 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.field.MapWritableFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down Expand Up @@ -160,7 +162,8 @@ protected void init() throws IOException {
int currentInstance = detectCurrentInstance(cfg);

if (log.isTraceEnabled()) {
log.trace(String.format("ESRecordWriter instance [%s] initiating discovery of target shard...", currentInstance));
log.trace(String.format("EsRecordWriter instance [%s] initiating discovery of target shard...",
currentInstance));
}

Settings settings = SettingsManager.loadFrom(cfg).copy();
Expand All @@ -182,9 +185,25 @@ protected void init() throws IOException {
beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log);
beat.start();

client = new RestRepository(settings);
resource = settings.getResourceWrite();

// single index vs multi indices
IndexFormat iformat = ObjectUtils.instantiate(settings.getMappingIndexFormatClassName(), settings);
iformat.compile(resource);
if (iformat.hasPattern()) {
initMultiIndices(settings, currentInstance);
}
else {
initSingleIndex(settings, currentInstance);
}
}

private void initSingleIndex(Settings settings, int currentInstance) throws IOException {
if (log.isDebugEnabled()) {
log.debug(String.format("Resource [%s] resolves as a single index", resource));
}

client = new RestRepository(settings);
// create the index if needed
if (client.touch()) {
if (client.waitForYellow()) {
Expand Down Expand Up @@ -216,10 +235,27 @@ protected void init() throws IOException {
uri = SettingsUtils.nodes(settings).get(0);

if (log.isDebugEnabled()) {
log.debug(String.format("ESRecordWriter instance [%s] assigned to primary shard [%s] at address [%s]", currentInstance, chosenShard.getName(), uri));
log.debug(String.format("EsRecordWriter instance [%s] assigned to primary shard [%s] at address [%s]", currentInstance, chosenShard.getName(), uri));
}
}

private void initMultiIndices(Settings settings, int currentInstance) throws IOException {
if (log.isDebugEnabled()) {
log.debug(String.format("Resource [%s] resolves as an index pattern", resource));
}

// use target node for indexing
uri = SettingsUtils.nodes(settings).get(0);
// override the global settings to communicate directly with the target node
settings.setHosts(uri);

if (log.isDebugEnabled()) {
log.debug(String.format("EsRecordWriter instance [%s] assigned to [%s]", uri));
}

client = new RestRepository(settings);
}

private int detectCurrentInstance(Configuration conf) {
TaskID taskID = TaskID.forName(HadoopCfgUtils.getTaskId(conf));

Expand Down Expand Up @@ -294,8 +330,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf cfg) throws IOException
// NB: all changes to the config objects are discarded before the job is submitted if _the old MR api_ is used
private void init(Configuration cfg) throws IOException {
Settings settings = SettingsManager.loadFrom(cfg);
Assert.hasText(settings.getResourceWrite(),
String.format("No resource ['%s'] (index/query/location) specified", ES_RESOURCE));
Assert.hasText(settings.getResourceWrite(), String.format("No resource ['%s'] (index/query/location) specified", ES_RESOURCE));

// lazy-init
RestRepository client = null;
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/rest/Resource.java
Expand Up @@ -34,6 +34,7 @@ public class Resource {
private final String indexAndType;
private final String type;
private final String index;
private final String bulk;

public Resource(Settings settings, boolean read) {
String resource = (read ? settings.getResourceRead() : settings.getResourceWrite());
Expand Down Expand Up @@ -87,10 +88,13 @@ public Resource(Settings settings, boolean read) {
Assert.hasText(type, "No type found; expecting [index]/[type]");

indexAndType = index + "/" + type;

// check bulk
bulk = (indexAndType.contains("{") ? "/_bulk" : indexAndType + "/_bulk");
}

String bulk() {
return indexAndType + "/_bulk";
return bulk;
}

// https://github.com/elasticsearch/elasticsearch/issues/2726
Expand Down
@@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import java.util.ArrayList;
import java.util.List;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ObjectUtils;



public abstract class AbstractIndexFormat implements IndexFormat, SettingsAware {

protected Settings settings;
protected String pattern;
protected boolean hasPattern = false;
protected List<Object> index;
protected List<Object> type;

@Override
public void setSettings(Settings settings) {
this.settings = settings;
}

@Override
public void compile(String pattern) {
this.pattern = pattern;
// break it down into index/type
String[] split = pattern.split("/");
Assert.isTrue(!ObjectUtils.isEmpty(split), "invalid pattern given " + pattern);
Assert.isTrue(split.length == 2, "invalid pattern given " + pattern);

// check pattern
hasPattern = pattern.contains("{") && pattern.contains("}");
index = parse(split[0].trim());
type = parse(split[1].trim());
}

protected List<Object> parse(String string) {
// break it down into fields
List<Object> template = new ArrayList<Object>();
while (string.contains("{")) {
int startPattern = string.indexOf("{");
template.add(string.substring(0, startPattern));
int endPattern = string.indexOf("}");
Assert.isTrue(endPattern > startPattern + 1, "Invalid pattern given " + string);
template.add(createFieldExtractor(string.substring(startPattern + 1, endPattern)));
string = string.substring(endPattern + 1).trim();
}
template.add(string);
return template;
}

private void append(StringBuilder sb, List<Object> list, Object target) {
for (Object object : list) {
if (object instanceof FieldExtractor) {
String field = ((FieldExtractor) object).field(target);
if (field == null) {
throw new EsHadoopIllegalArgumentException(String.format("Cannot find match for %s", pattern));
}
else {
sb.append(field);
}
}
else {
sb.append(object.toString());
}
}
}

@Override
public String field(Object target) {
StringBuilder sb = new StringBuilder();
sb.append("\"_index\":\"");
append(sb, index, target);
sb.append("\",");
sb.append("\"_type\":\"");
append(sb, type, target);
sb.append("\"");

return sb.toString();
}

@Override
public boolean hasPattern() {
return hasPattern;
}

protected abstract Object createFieldExtractor(String fieldName);
}
@@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.util.ObjectUtils;

public class DefaultIndexFormat extends AbstractIndexFormat {

protected Object createFieldExtractor(String fieldName) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, fieldName);
return ObjectUtils.instantiate(settings.getMappingDefaultClassExtractor(), settings);
}
}
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import org.elasticsearch.hadoop.serialization.field.FieldExtractor;


/**
* Produces an index name based on the given pattern and arguments (field values).
*/
public interface IndexFormat extends FieldExtractor {

void compile(String pattern);

/**
* Indicates whether the given string is a pattern or not.
*
* @return true for a pattern, false otherwise
*/
boolean hasPattern();
}

0 comments on commit 90e0527

Please sign in to comment.