Skip to content

Commit

Permalink
Allow complex objects within a doc to be used as script param
Browse files Browse the repository at this point in the history
Reuse the unpacking/json-ification of complex objects
relates elastic#223

(cherry picked from commit 8d5b0ea94a9dfd0ebf616beb9eb57e33bf69a6bc)
  • Loading branch information
costin committed Jul 31, 2014
1 parent 19ea9ca commit abb07e6
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 92 deletions.
Expand Up @@ -28,11 +28,11 @@ public class CascadingFieldExtractor extends ConstantFieldExtractor implements F

@SuppressWarnings({ "rawtypes" })
@Override
protected String extractField(Object target) {
protected Object extractField(Object target) {
if (target instanceof SinkCall) {
Object object = ((SinkCall) target).getOutgoingEntry().getObject(getFieldName());
if (object != null) {
return object.toString();
return object;
}
}
return NOT_FOUND;
Expand Down
Expand Up @@ -91,7 +91,7 @@ public int hashCode() {
* Generate the stream of bytes as hex pairs separated by ' '.
*/
public String toString() {
return (ba != null ? StringUtils.asUTFString(ba.bytes(), ba.length()) : "");
return (ba != null ? StringUtils.asUTFString(ba.bytes(), 0, ba.length()) : "");
}


Expand Down
Expand Up @@ -35,7 +35,7 @@ public class HiveFieldExtractor extends ConstantFieldExtractor {
private String fieldName;

@Override
protected String extractField(Object target) {
protected Object extractField(Object target) {
if (target instanceof HiveType) {
HiveType type = (HiveType) target;
ObjectInspector inspector = type.getObjectInspector();
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.HdpBootstrap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
Expand Down Expand Up @@ -135,15 +136,15 @@ public AbstractMROldApiSaveTest(JobConf config, String indexPrefix) {
}


//@Test
@Test
public void testBasicIndex() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/save");

runJob(conf);
}

//@Test
@Test
public void testBasicIndexWithId() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_MAPPING_ID, "number");
Expand All @@ -152,7 +153,7 @@ public void testBasicIndexWithId() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testCreateWithId() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_WRITE_OPERATION, "create");
Expand All @@ -162,12 +163,12 @@ public void testCreateWithId() throws Exception {
runJob(conf);
}

//@Test(expected = IOException.class)
@Test(expected = IOException.class)
public void testCreateWithIdShouldFailOnDuplicate() throws Exception {
testCreateWithId();
}

//@Test(expected = IOException.class)
@Test(expected = IOException.class)
public void testUpdateWithoutId() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
Expand All @@ -176,7 +177,7 @@ public void testUpdateWithoutId() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpsertWithId() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
Expand All @@ -186,7 +187,7 @@ public void testUpsertWithId() throws Exception {
runJob(conf);
}

//@Test(expected = IOException.class)
@Test(expected = IOException.class)
public void testUpdateWithoutUpsert() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_WRITE_OPERATION, "update");
Expand All @@ -196,7 +197,7 @@ public void testUpdateWithoutUpsert() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpdateOnlyScript() throws Exception {
JobConf conf = createJobConf();
// use an existing id to allow the update to succeed
Expand All @@ -212,7 +213,7 @@ public void testUpdateOnlyScript() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpdateOnlyParamScript() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/createwithid");
Expand All @@ -227,7 +228,7 @@ public void testUpdateOnlyParamScript() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpdateOnlyParamJsonScript() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/createwithid");
Expand All @@ -242,7 +243,7 @@ public void testUpdateOnlyParamJsonScript() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpdateOnlyParamJsonScriptWithArray() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/createwithid");
Expand All @@ -269,7 +270,7 @@ public void testUpdateOnlyParamJsonScriptWithArray() throws Exception {
// runJob(conf);
}

//@Test
@Test
public void testUpdateOnlyParamJsonScriptWithArrayOnArrayField() throws Exception {
String docWithArray = "{ \"counter\" : 1 , \"tags\" : [\"an array\", \"with multiple values\"], \"more_tags\" : [ \"I am tag\"], \"even_more_tags\" : \"I am a tag too\" } ";
String index = indexPrefix + "mroldapi/createwitharray";
Expand All @@ -291,7 +292,7 @@ public void testUpdateOnlyParamJsonScriptWithArrayOnArrayField() throws Exceptio
}


//@Test
@Test
public void testUpsertScript() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/upsert-script");
Expand All @@ -303,7 +304,7 @@ public void testUpsertScript() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpsertParamScript() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/upsert-script-param");
Expand All @@ -317,7 +318,7 @@ public void testUpsertParamScript() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testUpsertParamJsonScript() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/upsert-script-json-param");
Expand Down Expand Up @@ -353,7 +354,7 @@ public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception {
}


//@Test(expected = EsHadoopIllegalArgumentException.class)
@Test(expected = EsHadoopIllegalArgumentException.class)
public void testIndexAutoCreateDisabled() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/non-existing");
Expand All @@ -362,7 +363,7 @@ public void testIndexAutoCreateDisabled() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testParentChild() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/child");
Expand All @@ -373,7 +374,7 @@ public void testParentChild() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testIndexPattern() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "/mroldapi/pattern-{number}");
Expand All @@ -382,7 +383,7 @@ public void testIndexPattern() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testIndexPatternWithFormatting() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/pattern-format-{@timestamp:YYYY-MM-dd}");
Expand All @@ -391,7 +392,7 @@ public void testIndexPatternWithFormatting() throws Exception {
runJob(conf);
}

//@Test
@Test
public void testIndexPatternWithFormattingAndId() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/pattern-format-{@timestamp:YYYY-MM-dd}-with-id");
Expand All @@ -401,7 +402,7 @@ public void testIndexPatternWithFormattingAndId() throws Exception {
}


////@Test
//@Test
public void testNested() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/nested");
Expand Down
Expand Up @@ -28,8 +28,8 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
//@Suite.SuiteClasses({ AbstractMROldApiSaveTest.class, AbstractMROldApiSearchTest.class, AbstractMRNewApiSaveTest.class, AbstractMRNewApiSearchTest.class })
@Suite.SuiteClasses({ AbstractMROldApiSaveTest.class })
@Suite.SuiteClasses({ AbstractMROldApiSaveTest.class, AbstractMROldApiSearchTest.class, AbstractMRNewApiSaveTest.class, AbstractMRNewApiSearchTest.class })
//@Suite.SuiteClasses({ AbstractMROldApiSaveTest.class })
public class MRSuite {
@ClassRule
public static ExternalResource resource = new LocalEs();
Expand Down
Expand Up @@ -23,14 +23,18 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.FieldWriter;
import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.serialization.field.FieldExplainer;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

Expand All @@ -43,12 +47,52 @@ abstract class AbstractBulkFactory implements BulkFactory {
private JsonFieldExtractors jsonExtractors;

protected Settings settings;
private ValueWriter<?> valueWriter;
private ValueWriter valueWriter;
// used when specifying an index pattern
private IndexExtractor indexExtractor;
private FieldExtractor idExtractor, parentExtractor, routingExtractor, versionExtractor, ttlExtractor,
timestampExtractor, paramsExtractor;

class FieldWriter {
final FieldExtractor extractor;
final BytesArray pad;

FieldWriter(FieldExtractor extractor) {
this(extractor, new BytesArray(64));
}

FieldWriter(FieldExtractor extractor, BytesArray pad) {
this.extractor = extractor;
this.pad = pad;
}

BytesArray write(Object object) {
pad.reset();

Object value = extractor.field(object);
if (value == FieldExtractor.NOT_FOUND) {
String obj = (extractor instanceof FieldExplainer ? ((FieldExplainer) extractor).toString(object) : object.toString());
throw new EsHadoopIllegalArgumentException(String.format("[%s] cannot extract value from object [%s]", extractor, obj));
}
// common-case - constants
if (value instanceof String) {
pad.bytes(value.toString());
}
else {
JacksonJsonGenerator generator = new JacksonJsonGenerator(new FastByteArrayOutputStream(pad));
valueWriter.write(value, generator);
generator.flush();
generator.close();
// jackson add leading/trailing "" which are added down the pipeline so remove them
int size = pad.length();
pad.size(size - 2);
pad.offset(1);
}
return pad;
}
}


AbstractBulkFactory(Settings settings) {
this.settings = settings;
this.valueWriter = ObjectUtils.instantiate(settings.getSerializerValueWriterClassName(), settings);
Expand Down
Expand Up @@ -20,51 +20,22 @@

import java.util.Collection;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.builder.ContentBuilder;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.field.FieldExplainer;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory.FieldWriter;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.BytesRef;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
import org.elasticsearch.hadoop.util.StringUtils;

class TemplatedBulk implements BulkCommand {

static class FieldWriter {
final FieldExtractor extractor;
final BytesArray pad;

FieldWriter(FieldExtractor extractor) {
this(extractor, new BytesArray(64));
}

FieldWriter(FieldExtractor extractor, BytesArray pad) {
this.extractor = extractor;
this.pad = pad;
}

BytesArray write(Object object) {
String value = extractor.field(object);
if (value == FieldExtractor.NOT_FOUND) {
String obj = (extractor instanceof FieldExplainer ? ((FieldExplainer) extractor).toString(object) : object.toString());
throw new EsHadoopIllegalArgumentException(String.format("[%s] cannot extract value from object [%s]", extractor, obj));
}
if (StringUtils.hasText(value)) {
pad.bytes(value);
}
return pad;
}
}

private final Collection<Object> beforeObject;
private final Collection<Object> afterObject;

private BytesArray scratchPad = new BytesArray(1024);
private BytesRef ref = new BytesRef();

private final ValueWriter<?> valueWriter;
private final ValueWriter valueWriter;

TemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject, ValueWriter<?> valueWriter) {
this.beforeObject = beforeObject;
Expand Down
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;



public abstract class AbstractIndexExtractor implements IndexExtractor, SettingsAware {

protected Settings settings;
Expand Down Expand Up @@ -90,17 +88,18 @@ private Object wrapWithFormatter(String format, final FieldExtractor createField
iformatter.configure(format);
return new FieldExtractor() {
@Override
public String field(Object target) {
return iformatter.format(createFieldExtractor.field(target));
public Object field(Object target) {
// hack: an index will always be a primitive so just call toString (instead of doing JSON parsing)
return iformatter.format(createFieldExtractor.field(target).toString());
}
};
}

private void append(StringBuilder sb, List<Object> list, Object target) {
for (Object object : list) {
if (object instanceof FieldExtractor) {
String field = ((FieldExtractor) object).field(target);
if (field == null) {
Object field = ((FieldExtractor) object).field(target);
if (field == NOT_FOUND) {
throw new EsHadoopIllegalArgumentException(String.format("Cannot find match for %s", pattern));
}
else {
Expand Down

0 comments on commit abb07e6

Please sign in to comment.