Skip to content

Commit

Permalink
allow raw json as a source
Browse files Browse the repository at this point in the history
add support for indexing raw json without any type conversion
currently only the M/R module has been tested properly
the data is passed as is, without any transformation
if needed, the raw document can be inspected to extra 'id' information

relates #9
fixes #75
fixes #126
  • Loading branch information
costin committed Jan 16, 2014
1 parent 108fc94 commit e127d09
Show file tree
Hide file tree
Showing 61 changed files with 1,835 additions and 236 deletions.
Expand Up @@ -23,8 +23,8 @@
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.mr.WritableValueWriter;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;

import cascading.scheme.SinkCall;
import cascading.tuple.Tuple;
Expand Down
Expand Up @@ -34,8 +34,8 @@
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down Expand Up @@ -118,8 +118,8 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe
// define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
Settings set = SettingsManager.loadFrom(conf);

SerializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
SerializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class));

// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
Expand Down
Expand Up @@ -29,8 +29,7 @@
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;

import cascading.flow.FlowProcess;
Expand Down Expand Up @@ -116,8 +115,8 @@ private void initClient(Properties props) {
if (client == null) {
Settings settings = SettingsManager.loadFrom(props).setHosts(host).setPort(port).setResource(resource);

SerializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
SerializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, LogFactory.getLog(EsTap.class));
settings.save();
client = new RestRepository(settings);
}
Expand Down
Expand Up @@ -31,8 +31,8 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.rest.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.StringUtils;

import cascading.flow.FlowProcess;
Expand Down
Expand Up @@ -91,6 +91,10 @@ public interface ConfigurationOptions {
/** Value reader - setup automatically; can be overridden for custom types */
String ES_SERIALIZATION_READER_CLASS = "es.ser.reader.class";

/** Input options **/
String ES_INPUT_JSON = "es.input.json";
String ES_INPUT_JSON_DEFAULT = "false";

/** Field options **/
String ES_FIELD_READ_EMPTY_AS_NULL = "es.field.read.empty.as.null";
String ES_FIELD_READ_EMPTY_AS_NULL_DEFAULT = "true";
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -124,6 +124,10 @@ public boolean getIndexReadMissingAsEmpty() {
return Booleans.parseBoolean(getProperty(ES_INDEX_READ_MISSING_AS_EMPTY, ES_INDEX_READ_MISSING_AS_EMPTY_DEFAULT));
}

public boolean getInputAsJson() {
return Booleans.parseBoolean(getProperty(ES_INPUT_JSON, ES_INPUT_JSON_DEFAULT));
}

public String getOperation() {
return getProperty(ES_WRITE_OPERATION, ES_WRITE_OPERATION_DEFAULT).toLowerCase(Locale.ENGLISH);
}
Expand All @@ -148,7 +152,7 @@ public String getMappingTtl() {
return getProperty(ES_MAPPING_TTL);
}

public Object getMappingTimestamp() {
public String getMappingTimestamp() {
return getProperty(ES_MAPPING_TIMESTAMP);
}

Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -46,9 +46,8 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.BulkCommands;
import org.elasticsearch.hadoop.serialization.Command;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.serialization.command.BulkCommands;
import org.elasticsearch.hadoop.serialization.command.Command;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FieldAlias;

Expand Down Expand Up @@ -123,7 +122,7 @@ private void lazyInitializeWrite() {
writeInitialized = true;
Settings settings = SettingsManager.loadFrom(tableProperties);

SerializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);
this.command = BulkCommands.create(settings);
}
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down Expand Up @@ -92,8 +91,8 @@ private void init(TableDesc tableDesc) {

InitializationUtils.checkIdForOperation(settings);

SerializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
SerializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);

settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS,
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.ConstantFieldExtractor;
import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.util.FieldAlias;

/**
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -48,8 +48,7 @@
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.rest.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.serialization.ValueReader;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;
Expand Down Expand Up @@ -171,7 +170,7 @@ void init(ShardInputSplit esSplit, Configuration cfg) {
this.esSplit = esSplit;

// initialize mapping/ scroll reader
SerializationUtils.setValueReaderIfNotSet(settings, WritableValueReader.class, log);
InitializationUtils.setValueReaderIfNotSet(settings, WritableValueReader.class, log);
ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);

String mappingData = esSplit.mapping;
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java
Expand Up @@ -43,8 +43,7 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.serialization.MapWritableFieldExtractor;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.serialization.field.MapWritableFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
Expand Down Expand Up @@ -160,7 +159,7 @@ protected void init() throws IOException {

Settings settings = SettingsManager.loadFrom(cfg);

SerializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, log);
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, log);
InitializationUtils.discoverNodesIfNeeded(settings, log);
// pick the host based on id
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;

public class WritableValueReader extends JdkValueReader {

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.io.AbstractMapWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
Expand All @@ -39,7 +40,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;

@SuppressWarnings("deprecation")
public class WritableValueWriter implements ValueWriter<Writable> {
Expand Down Expand Up @@ -67,6 +68,9 @@ else if (writable instanceof UTF8) {
UTF8 utf8 = (UTF8) writable;
generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength());
}
else if (writable instanceof ShortWritable) {
generator.writeNumber(((ShortWritable) writable).get());
}
else if (writable instanceof IntWritable) {
generator.writeNumber(((IntWritable) writable).get());
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java
Expand Up @@ -59,7 +59,6 @@
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;
Expand Down Expand Up @@ -149,8 +148,8 @@ private void init(String location, Job job) {
boolean changed = false;
InitializationUtils.checkIdForOperation(settings);

changed |= SerializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
changed |= SerializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
changed |= InitializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
changed |= InitializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
changed |= InitializationUtils.setFieldExtractorIfNotSet(settings, PigFieldExtractor.class, log);
settings.save();
}
Expand Down
Expand Up @@ -22,13 +22,11 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.ConstantFieldExtractor;
import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;

public class PigFieldExtractor extends ConstantFieldExtractor {

private String fieldName;

@Override
protected String extractField(Object target) {
if (target instanceof PigTuple) {
Expand All @@ -37,14 +35,14 @@ protected String extractField(Object target) {

for (int i = 0; i < fields.length; i++) {
ResourceFieldSchema field = fields[i];
if (fieldName.equals(field.getName())) {
if (getFieldName().equals(field.getName())) {
byte type = field.getType();
Assert.isTrue(DataType.isAtomic(type),
String.format("Unsupported data type [%s] for field [%s]; use only 'primitives'", DataType.findTypeName(type), fieldName));
String.format("Unsupported data type [%s] for field [%s]; use only 'primitives'", DataType.findTypeName(type), getFieldName()));
try {
return pt.getTuple().get(i).toString();
} catch (ExecException ex) {
throw new IllegalStateException(String.format("Cannot retrieve field [%s]", fieldName), ex);
throw new IllegalStateException(String.format("Cannot retrieve field [%s]", getFieldName()), ex);
}
}
}
Expand All @@ -56,6 +54,5 @@ protected String extractField(Object target) {
@Override
public void setSettings(Settings settings) {
super.setSettings(settings);
fieldName = getFieldName();
}
}
Expand Up @@ -22,7 +22,7 @@

import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.TupleFactory;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;

public class PigValueReader extends JdkValueReader {

Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.SerializationException;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down
Expand Up @@ -29,9 +29,12 @@
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.serialization.ContentBuilder;
import org.elasticsearch.hadoop.serialization.FieldExtractor;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ContentBuilder;
import org.elasticsearch.hadoop.serialization.builder.NoOpValueWriter;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.field.NoOpFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
Expand Down Expand Up @@ -93,9 +96,20 @@ public static void checkIndexExistence(Settings settings, RestRepository client)

public static boolean setFieldExtractorIfNotSet(Settings settings, Class<? extends FieldExtractor> clazz, Log log) {
if (!StringUtils.hasText(settings.getMappingIdExtractorClassName())) {
settings.setProperty(ConfigurationOptions.ES_MAPPING_DEFAULT_EXTRACTOR_CLASS, clazz.getName());
Log logger = (log != null ? log : LogFactory.getLog(clazz));
logger.debug(String.format("Using pre-defined field extractor [%s] as default", settings.getMappingIdExtractorClassName()));

String name = clazz.getName();
if (settings.getInputAsJson()) {
name = NoOpFieldExtractor.class.getName();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Elasticsearch input marked as JSON; using dedicated field extractor [%s] instead of [%s]", name, clazz));
}
}

settings.setProperty(ConfigurationOptions.ES_MAPPING_DEFAULT_EXTRACTOR_CLASS, name);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using pre-defined field extractor [%s] as default", settings.getMappingIdExtractorClassName()));
}
return true;
}

Expand Down Expand Up @@ -126,4 +140,39 @@ public static <T> void saveSchemaIfNeeded(Object conf, ValueWriter<T> schemaWrit
client.close();
}
}
}

public static boolean setValueWriterIfNotSet(Settings settings, Class<? extends ValueWriter<?>> clazz, Log log) {
if (!StringUtils.hasText(settings.getSerializerValueWriterClassName())) {
Log logger = (log != null ? log : LogFactory.getLog(clazz));

String name = clazz.getName();
if (settings.getInputAsJson()) {
name = NoOpValueWriter.class.getName();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Elasticsearch input marked as JSON; bypassing serialization through [%s] instead of [%s]", name, clazz));
}
}
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_CLASS, name);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using pre-defined writer serializer [%s] as default", settings.getSerializerValueWriterClassName()));
}
return true;
}

return false;
}

public static boolean setValueReaderIfNotSet(Settings settings, Class<? extends ValueReader> clazz, Log log) {

if (!StringUtils.hasText(settings.getSerializerValueReaderClassName())) {
settings.setProperty(ConfigurationOptions.ES_SERIALIZATION_READER_CLASS, clazz.getName());
Log logger = (log != null ? log : LogFactory.getLog(clazz));
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using pre-defined reader serializer [%s] as default", settings.getSerializerValueReaderClassName()));
}
return true;
}

return false;
}
}
Expand Up @@ -30,9 +30,9 @@
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.rest.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.BulkCommands;
import org.elasticsearch.hadoop.serialization.Command;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.command.BulkCommands;
import org.elasticsearch.hadoop.serialization.command.Command;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.BytesRef;
Expand Down

0 comments on commit e127d09

Please sign in to comment.