Skip to content

Commit

Permalink
make json / bytes conversion configuration
Browse files Browse the repository at this point in the history
relates to elastic#9
  • Loading branch information
costin committed Jan 17, 2014
1 parent e127d09 commit 0b7e7f4
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 34 deletions.
Expand Up @@ -86,10 +86,13 @@ public interface ConfigurationOptions {
/** Serialization settings */

/** Value writer - setup automatically; can be overridden for custom types */
String ES_SERIALIZATION_WRITER_CLASS = "es.ser.writer.class";
String ES_SERIALIZATION_WRITER_VALUE_CLASS = "es.ser.writer.value.class";

/** JSON/Bytes writer - setup automatically; can be overridden for custom types */
String ES_SERIALIZATION_WRITER_BYTES_CLASS = "es.ser.writer.bytes.class";

/** Value reader - setup automatically; can be overridden for custom types */
String ES_SERIALIZATION_READER_CLASS = "es.ser.reader.class";
String ES_SERIALIZATION_READER_VALUE_CLASS = "es.ser.reader.value.class";

/** Input options **/
String ES_INPUT_JSON = "es.input.json";
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -109,11 +109,15 @@ public String getScrollFields() {
}

public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_CLASS);
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
}

public String getSerializerBytesWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
}

public String getSerializerValueReaderClassName() {
return getProperty(ES_SERIALIZATION_READER_CLASS);
return getProperty(ES_SERIALIZATION_READER_VALUE_CLASS);
}

public boolean getIndexAutoCreate() {
Expand Down
Expand Up @@ -160,6 +160,7 @@ protected void init() throws IOException {
Settings settings = SettingsManager.loadFrom(cfg);

InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, log);
InitializationUtils.setBytesWriterIfNeeded(settings, WritableBytesWriter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, log);
InitializationUtils.discoverNodesIfNeeded(settings, log);
// pick the host based on id
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.serialization.BytesWriter;
import org.elasticsearch.hadoop.serialization.builder.ContentBuilder;
import org.elasticsearch.hadoop.serialization.builder.NoOpValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
Expand Down Expand Up @@ -152,7 +153,7 @@ public static boolean setValueWriterIfNotSet(Settings settings, Class<? extends
logger.debug(String.format("Elasticsearch input marked as JSON; bypassing serialization through [%s] instead of [%s]", name, clazz));
}
}
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, name);
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, name);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using pre-defined writer serializer [%s] as default", settings.getSerializerValueWriterClassName()));
}
Expand All @@ -162,10 +163,23 @@ public static boolean setValueWriterIfNotSet(Settings settings, Class<? extends
return false;
}

public static boolean setBytesWriterIfNeeded(Settings settings, Class<? extends BytesWriter> clazz, Log log) {
if (settings.getInputAsJson() && !StringUtils.hasText(settings.getSerializerBytesWriterClassName())) {
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_BYTES_CLASS, clazz.getName());
Log logger = (log != null ? log : LogFactory.getLog(clazz));
if (logger.isDebugEnabled()) {
logger.debug(String.format("JSON input specified; using pre-defined bytes/json serializer [%s] as default", settings.getSerializerBytesWriterClassName()));
}
return true;
}

return false;
}

public static boolean setValueReaderIfNotSet(Settings settings, Class<? extends ValueReader> clazz, Log log) {

if (!StringUtils.hasText(settings.getSerializerValueReaderClassName())) {
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_READER_CLASS, clazz.getName());
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_READER_VALUE_CLASS, clazz.getName());
Log logger = (log != null ? log : LogFactory.getLog(clazz));
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using pre-defined reader serializer [%s] as default", settings.getSerializerValueReaderClassName()));
Expand Down
Expand Up @@ -157,7 +157,7 @@ public Command createCommand() {
after = compact(after);

// compress pieces
return (jsonInput ? new JsonTemplatedCommand(before, after, jsonExtractors) : new TemplatedCommand(before, after, valueWriter));
return (jsonInput ? new JsonTemplatedCommand(before, after, jsonExtractors, settings) : new TemplatedCommand(before, after, valueWriter));
}

protected void writeAfterObject(List<Object> after) {
Expand Down
Expand Up @@ -22,14 +22,14 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.BytesWriter;
import org.elasticsearch.hadoop.serialization.builder.NoOpValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.ObjectUtils;

/**
* Dedicated JSON command that skips the content generation phase (since the data is already JSON).
Expand All @@ -39,36 +39,21 @@ class JsonTemplatedCommand extends TemplatedCommand {
private static Log log = LogFactory.getLog(JsonTemplatedCommand.class);

private final JsonFieldExtractors jsonExtractors;
private final BytesWriter jsonWriter;

public JsonTemplatedCommand(Collection<Object> beforeObject, Collection<Object> afterObject, JsonFieldExtractors jsonExtractors) {
public JsonTemplatedCommand(Collection<Object> beforeObject, Collection<Object> afterObject,
JsonFieldExtractors jsonExtractors, Settings settings) {
super(beforeObject, afterObject, new NoOpValueWriter());
this.jsonExtractors = jsonExtractors;
this.jsonWriter = ObjectUtils.instantiate(settings.getSerializerBytesWriterClassName(), settings);
}

@Override
protected Object preProcess(Object object, BytesArray storage) {
// serialize the json early on and copy it to storage
Assert.notNull(object, "Empty/null JSON document given...");
Assert.isTrue(object instanceof Writable,
String.format("Class [%s] not supported; only Hadoop Writables", object.getClass()));

// handle common cases
if (object instanceof Text) {
Text t = (Text) object;
storage.bytes(t.getBytes(), t.getLength());
}
else if (object instanceof BytesWritable) {
BytesWritable b = (BytesWritable) object;
storage.bytes(b.getBytes(), b.getLength());
}
else {
// fall-back to generic toString contract
if (log.isTraceEnabled()) {
log.trace(String.format("Unknown Writable type for object [%s], using default toString()", object));
}

storage.bytes(object.toString());
}
jsonWriter.write(object, storage);

if (log.isTraceEnabled()) {
log.trace(String.format("About to extract information from [%s]", storage));
Expand Down
Expand Up @@ -51,8 +51,8 @@ public class RestQueryTest {
public void start() throws IOException {
settings = new TestSettings("rest/savebulk");
//testSettings.setPort(9200)
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, JdkValueWriter.class.getName());
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, JdkValueWriter.class.getName());
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
client = new RestRepository(settings);
client.waitForYellow();
}
Expand Down
Expand Up @@ -38,7 +38,7 @@ public class RestSaveTest {
public void testBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/savebulk");
//testSettings.setPort(9200)
testSettings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, JdkValueWriter.class.getName());
testSettings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestRepository client = new RestRepository(testSettings);

Scanner in = new Scanner(getClass().getResourceAsStream("/artists.dat")).useDelimiter("\\n|\\t");
Expand All @@ -61,7 +61,7 @@ public void testBulkWrite() throws Exception {
@Test
public void testEmptyBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/emptybulk");
testSettings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, JdkValueWriter.class.getName());
testSettings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestClient client = new RestRepository(testSettings).getRestClient();
client.bulk(new Resource(testSettings), new TrackingBytesArray(new BytesArray("{}")));
client.close();
Expand Down

0 comments on commit 0b7e7f4

Please sign in to comment.