Skip to content

Commit

Permalink
Improve handling of nested objects given as params
Browse files Browse the repository at this point in the history
relates elastic#223

(cherry picked from commit 921c34145e26572352cb1a16c1d307c32a89c9d9)
  • Loading branch information
costin committed Jul 31, 2014
1 parent 71a2ff8 commit 7a0ed64
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 39 deletions.
Expand Up @@ -127,7 +127,7 @@ public void testUpdateOnlyParamScript() throws Exception {
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id ");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number ");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");

Tap in = sourceTap();
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testUpsertParamScript() throws Exception {
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id ");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number ");

Tap in = sourceTap();
// use an existing id to allow the update to succeed
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
Expand Down Expand Up @@ -54,6 +56,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import static org.junit.Assume.*;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
public class AbstractMROldApiSaveTest {
Expand Down Expand Up @@ -88,6 +92,18 @@ public void map(Object key, Object value, OutputCollector output, Reporter repor
output.collect(key, WritableUtils.toWritable(entry));
}
}

public static class ConstantMapper extends MapReduceBase implements Mapper {

@Override
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
MapWritable map = new MapWritable();
map.put(new Text("key"), new Text("value"));
output.collect(new LongWritable(), map);
}
}


public static class SplittableTextInputFormat extends TextInputFormat {

@Override
Expand Down Expand Up @@ -136,6 +152,18 @@ public AbstractMROldApiSaveTest(JobConf config, String indexPrefix) {
}


@Test
public void testNoInput() throws Exception {
JobConf conf = createJobConf();

// use only when dealing with constant input
assumeFalse(conf.get(ConfigurationOptions.ES_INPUT_JSON).equals("true"));
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/constant");
conf.setMapperClass(ConstantMapper.class);

runJob(conf);
}

@Test
public void testBasicIndex() throws Exception {
JobConf conf = createJobConf();
Expand Down Expand Up @@ -348,7 +376,7 @@ public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception {
conf.set(ConfigurationOptions.ES_MAPPING_ID, "<1>");
conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT, "ctx._source.tags = update_tags");
conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel");
conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, "update_tags:tags");
conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, (conf.get(ConfigurationOptions.ES_INPUT_JSON).equals("true") ? "update_tags:name" :"update_tags:list"));

runJob(conf);
}
Expand Down
Expand Up @@ -44,6 +44,10 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import static org.junit.Assert.*;

import static org.hamcrest.Matchers.*;

@RunWith(Parameterized.class)
public class AbstractMROldApiSearchTest {

Expand Down Expand Up @@ -141,6 +145,15 @@ public void testDynamicPatternWithFormat() throws Exception {
Assert.assertTrue(RestUtils.exists("mroldapi/pattern-format-2945-10-06"));
}

@Test
public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception {
String target = "mroldapi/createwitharrayupsert/1";
Assert.assertTrue(RestUtils.exists(target));
String result = RestUtils.get(target);
System.out.println(result);
assertThat(result, not(containsString("ArrayWritable@")));
}

//@Test
public void testNested() throws Exception {
JobConf conf = createJobConf();
Expand Down
11 changes: 11 additions & 0 deletions mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java
Expand Up @@ -47,6 +47,10 @@ public Response execute(Request.Method method, String path, ByteSequence buffer)
return super.execute(method, path, buffer);
}

public String get(String index) throws IOException {
return IOUtils.asString(execute(Request.Method.GET, index));
}

public String post(String index, byte[] buffer) throws IOException {
return IOUtils.asString(execute(Request.Method.POST, index, new BytesArray(buffer)).body());
}
Expand Down Expand Up @@ -74,6 +78,13 @@ public static Field getMapping(String index) throws Exception {
return parseField;
}

public static String get(String index) throws Exception {
ExtendedRestClient rc = new ExtendedRestClient();
String str = rc.get(index);
rc.close();
return str;
}

public static void putMapping(String index, String location) throws Exception {
putMapping(index, TestUtils.fromInputStream(RestUtils.class.getClassLoader().getResourceAsStream(location)));
}
Expand Down
Expand Up @@ -29,6 +29,8 @@

public abstract class ParsingUtils {

public static final String NOT_FOUND = "(not found)";

/**
* Seeks the field with the given name in the stream and positions (and returns) the parser to the next available token (value or not).
* Return null if no token is found.
Expand Down Expand Up @@ -139,7 +141,7 @@ public static List<String> values(Parser parser, String... paths) {

List<String> matches = new ArrayList<String>();
for (Matcher matcher : matchers) {
matches.add(matcher.matched ? matcher.value.toString() : null);
matches.add(matcher.matched ? (matcher.value != null ? matcher.value.toString() : StringUtils.EMPTY) : NOT_FOUND);
}

return matches;
Expand Down
Expand Up @@ -32,8 +32,10 @@
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors;
import org.elasticsearch.hadoop.serialization.field.WithoutQuotes;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.BytesArrayPool;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;
Expand All @@ -53,45 +55,75 @@ abstract class AbstractBulkFactory implements BulkFactory {
private FieldExtractor idExtractor, parentExtractor, routingExtractor, versionExtractor, ttlExtractor,
timestampExtractor, paramsExtractor;

static final BytesArray QUOTE = new BytesArray("\"");

class FieldWriter {
final FieldExtractor extractor;
final BytesArray pad;
final boolean addQuotesIfNecessary;
final BytesArrayPool pool = new BytesArrayPool();

FieldWriter(FieldExtractor extractor) {
this(extractor, new BytesArray(64));
}

FieldWriter(FieldExtractor extractor, BytesArray pad) {
this.extractor = extractor;
this.pad = pad;
addQuotesIfNecessary = (extractor instanceof WithoutQuotes);
}

BytesArray write(Object object) {
pad.reset();
BytesArrayPool write(Object object) {
pool.reset();

Object value = extractor.field(object);
if (value == FieldExtractor.NOT_FOUND) {
String obj = (extractor instanceof FieldExplainer ? ((FieldExplainer) extractor).toString(object) : object.toString());
throw new EsHadoopIllegalArgumentException(String.format("[%s] cannot extract value from object [%s]", extractor, obj));
}

if (value instanceof List) {
List list = (List) value;
for (int i = 0; i < list.size() - 1; i++) {
doWrite(list.get(i), false);
}
//
doWrite(list.get(list.size() - 1), true);
}
// weird if/else to save one collection/iterator instance
else {
doWrite(value, true);
}

return pool;
}

void doWrite(Object value, boolean lookForQuotes) {
// common-case - constants
if (value instanceof String) {
pad.bytes(value.toString());
if (value instanceof String || jsonInput) {
String val = value.toString();
if (lookForQuotes && addQuotesIfNecessary) {
if (val.startsWith("[") || val.startsWith("{")) {
pool.get().bytes(val);
}
else {
pool.get().bytes(QUOTE);
pool.get().bytes(val);
pool.get().bytes(QUOTE);
}
}
else {
pool.get().bytes(val);
}
}
else {
JacksonJsonGenerator generator = new JacksonJsonGenerator(new FastByteArrayOutputStream(pad));
BytesArray ba = pool.get();
JacksonJsonGenerator generator = new JacksonJsonGenerator(new FastByteArrayOutputStream(ba));
valueWriter.write(value, generator);
generator.flush();
generator.close();

// jackson likely will add leading/trailing "" which are added down the pipeline so remove them
// however that's not mandatory in case the source is a number (instead of a string)
if (pad.bytes()[pad.offset()] == '"') {
int size = pad.length();
pad.size(Math.max(0, size - 2));
pad.offset(1);
if ((lookForQuotes && !addQuotesIfNecessary) && ba.bytes()[ba.offset()] == '"') {
ba.size(Math.max(0, ba.length() - 2));
ba.offset(1);
}
}
return pad;
}
}

Expand Down Expand Up @@ -268,7 +300,7 @@ private List<Object> compact(List<Object> list) {
stringAccumulator.setLength(0);
lastString = null;
}
compacted.add(new FieldWriter((FieldExtractor) object));
compacted.add(createFieldWriter((FieldExtractor) object));
}
else {
String str = object.toString();
Expand All @@ -287,6 +319,10 @@ private List<Object> compact(List<Object> list) {
return compacted;
}

protected Object createFieldWriter(FieldExtractor extractor) {
return new FieldWriter(extractor);
}

protected void writeBeforeObject(List<Object> pieces) {
startHeader(pieces);

Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.hadoop.serialization.field;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,22 +29,35 @@
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.StringUtils;

public abstract class AbstractDefaultParamsExtractor implements FieldExtractor, SettingsAware {
public abstract class AbstractDefaultParamsExtractor implements FieldExtractor, SettingsAware, FieldExplainer, WithoutQuotes {

private Map<String, FieldExtractor> params = new LinkedHashMap<String, FieldExtractor>();
protected Settings settings;
// field explainer saved in case of a failure for diagnostics
private FieldExtractor lastFailingFieldExtractor;

@Override
public String field(Object target) {
StringBuilder sb = new StringBuilder();
public Object field(Object target) {
List<Object> list = new ArrayList<Object>(params.size());
for (Entry<String, FieldExtractor> entry : params.entrySet()) {
sb.append("\"");
sb.append(entry.getKey());
sb.append("\":\"");
sb.append(entry.getValue().field(target));
sb.append("\",");
list.add("\"");
list.add(entry.getKey());
list.add("\":");
Object field = entry.getValue().field(target);
if (field == FieldExtractor.NOT_FOUND) {
lastFailingFieldExtractor = entry.getValue();
return FieldExtractor.NOT_FOUND;
}
list.add(field);
list.add(",");
}
return sb.substring(0, sb.length() - 1);
list.remove(list.size() - 1);
return list;
}

@Override
public String toString(Object target) {
return (lastFailingFieldExtractor instanceof FieldExplainer ? ((FieldExplainer) lastFailingFieldExtractor).toString(target) : target.toString());
}

@Override
Expand All @@ -60,5 +74,14 @@ public void setSettings(Settings settings) {
}
}

@Override
public String toString() {
if (lastFailingFieldExtractor != null) {
return lastFailingFieldExtractor.toString();
}

return String.format("%s for fields [%s]", getClass().getSimpleName(), params.keySet());
}

protected abstract FieldExtractor createFieldExtractor(String fieldName);
}
Expand Up @@ -20,5 +20,5 @@

public interface FieldExplainer {

public String toString(Object field);
String toString(Object field);
}
Expand Up @@ -26,7 +26,7 @@
*/
public interface FieldExtractor {

public String NOT_FOUND = "(not found)";
public Object NOT_FOUND = new Object();

/**
* Returns the associated JSON representation for the given target.
Expand Down

0 comments on commit 7a0ed64

Please sign in to comment.