Skip to content

Commit

Permalink
add initial projection support with M&R and Pig implementation
Browse files Browse the repository at this point in the history
data retrieval retrieves the fields individually to minimize the amount of data being transfer
Pig schemas are detected automatically (if present) and the info used

relates to elastic#54
relates to elastic#13
  • Loading branch information
costin committed Dec 2, 2013
1 parent 113ab42 commit 509730e
Show file tree
Hide file tree
Showing 19 changed files with 455 additions and 140 deletions.
Expand Up @@ -56,6 +56,9 @@ public interface ConfigurationOptions {
String ES_SCROLL_SIZE = "es.scroll.size";
String ES_SCROLL_SIZE_DEFAULT = "50";

/** Scroll fields */
String ES_SCROLL_FIELDS = "es.scroll.fields";

/** Serialization settings */

/** Value writer - setup automatically; can be overridden for custom types */
Expand Down
Expand Up @@ -22,4 +22,5 @@ interface InternalConfigurationOptions extends ConfigurationOptions {

static final String INTERNAL_ES_TARGET_RESOURCE = "es.internal.mr.target.resource";
static final String INTERNAL_ES_TARGET_URI = "es.internal.mr.target.uri";
static final String INTERNAL_ES_TARGET_FIELDS = "es.internal.mr.target.fields";
}
5 changes: 5 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -66,6 +66,11 @@ public long getScrollSize() {
return Long.valueOf(getProperty(ES_SCROLL_SIZE, ES_SCROLL_SIZE_DEFAULT));
}

public String getScrollFields() {
String internalFields = getProperty(INTERNAL_ES_TARGET_FIELDS);
return (StringUtils.hasText(internalFields) ? internalFields : getProperty(ES_SCROLL_FIELDS));
}

public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_CLASS);
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/ESInputFormat.java
Expand Up @@ -192,6 +192,12 @@ void init(ShardInputSplit esSplit, Configuration cfg) {
.shard(esSplit.shardId)
.onlyNode(esSplit.nodeId);

String fields = settings.getScrollFields();
if (StringUtils.hasText(fields)) {
queryBuilder.fields(fields);
}


if (log.isDebugEnabled()) {
log.debug(String.format("Initializing RecordReader for [%s]", esSplit));
}
Expand Down
58 changes: 0 additions & 58 deletions src/main/java/org/elasticsearch/hadoop/pig/DateConverter.java

This file was deleted.

97 changes: 94 additions & 3 deletions src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
Expand Up @@ -18,13 +18,16 @@
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
Expand All @@ -33,13 +36,18 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
Expand All @@ -48,6 +56,7 @@
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

/**
* Pig storage for reading and writing data into an ElasticSearch index.
Expand All @@ -65,9 +74,10 @@
* The ElasticSearch host/port can be specified through Hadoop properties (see package description)
* or passed to the {@link #ESStorage(String)} constructor.
*/
public class ESStorage extends LoadFunc implements StoreFuncInterface, StoreMetadata {
public class ESStorage extends LoadFunc implements LoadPushDown, StoreFuncInterface, StoreMetadata {

private static final Log log = LogFactory.getLog(ESStorage.class);
private static final String FIELDS = "es.internal.mr.target.fields";
private final boolean trace = log.isTraceEnabled();

private Properties properties;
Expand Down Expand Up @@ -107,9 +117,13 @@ public void setStoreFuncUDFContextSignature(String signature) {
this.signature = signature;
}

private Properties getUDFProperties() {
return UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
}

@Override
public void checkSchema(ResourceSchema s) throws IOException {
Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
Properties props = getUDFProperties();

// save schema to back-end for JSON translation
if (props.getProperty(ResourceSchema.class.getName()) == null) {
Expand Down Expand Up @@ -145,7 +159,7 @@ public OutputFormat<Object, Map<Writable, Writable>> getOutputFormat() throws IO
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;

Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
Properties props = getUDFProperties();
String s = props.getProperty(ResourceSchema.class.getName());
this.schema = IOUtils.deserializeFromBase64(s);
this.pigTuple = new PigTuple(schema);
Expand Down Expand Up @@ -195,8 +209,67 @@ public void storeSchema(ResourceSchema schema, String location, Job job) throws
//
// LoadFunc
//
@SuppressWarnings("unchecked")
public void setLocation(String location, Job job) throws IOException {
init(location, job);

Configuration cfg = job.getConfiguration();

Settings settings = SettingsManager.loadFrom(cfg);

if (settings.getScrollFields() != null) {
return;
}

String fields = getUDFProperties().getProperty(FIELDS);
if (fields != null) {
if (log.isDebugEnabled()) {
log.debug(String.format("Found field project [%s] in UDF properties", fields));
}

cfg.set(FIELDS, fields);
return;
}

if (log.isTraceEnabled()) {
log.trace("No field projection specified, looking for existing stores...");
}

String mapValues = cfg.get(JobControlCompiler.PIG_MAP_STORES);
String reduceValues = cfg.get(JobControlCompiler.PIG_REDUCE_STORES);

List<POStore> mapStore = Collections.emptyList();
List<POStore> reduceStore = Collections.emptyList();

if (StringUtils.hasText(mapValues)) {
mapStore = (List<POStore>) ObjectSerializer.deserialize(mapValues);
}
if (StringUtils.hasText(reduceValues)) {
reduceStore = (List<POStore>) ObjectSerializer.deserialize(reduceValues);
}
if (mapStore.size() + reduceStore.size() > 1) {
log.warn("Too many POstores - cannot properly determine Pig schema");
}
else if (mapStore.size() + reduceStore.size() == 0) {
log.warn("No POstores - cannot properly determine Pig schema");
}
else {
POStore store = (reduceStore.isEmpty() ? mapStore.get(0) : reduceStore.get(0));
// no schema specified - load all fields (or the default)
if (store.getSchema() == null) {
if (log.isTraceEnabled()) {
log.trace(String.format("Store [%s] defines no schema; falling back to default projection", store));
}
return;
}
else {
fields = PigUtils.asProjection(store.getSchema(), properties);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Found field projection [%s] in store %s", fields, store));
}
cfg.set(FIELDS, fields);
}
}


Expand Down Expand Up @@ -245,4 +318,22 @@ public Tuple getNext() throws IOException {
throw new IOException("interrupted", ex);
}
}

//
// LoadPushDown
//
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}

@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
String fields = PigUtils.asProjection(requiredFieldList, properties);
getUDFProperties().setProperty(FIELDS, fields);
if (log.isTraceEnabled()) {
log.trace(String.format("Given push projection; saving field projection [%s]", fields));
}
return new RequiredFieldResponse(true);
}
}
28 changes: 0 additions & 28 deletions src/main/java/org/elasticsearch/hadoop/pig/FieldAlias.java
Expand Up @@ -16,20 +16,14 @@
package org.elasticsearch.hadoop.pig;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.util.StringUtils;

/**
* Basic class for field aliasing since Pig column names are restricted to: [0-9a-z_] and cannot start with numbers. Without any mapping, the alias will convert all Pig columns to lower case.
* Note that Pig is case sensitive.
*/
class FieldAlias {

private static final String MAPPING_NAMES = "es.mapping.names";

private final Map<String, String> pigToES;

public FieldAlias() {
Expand All @@ -49,26 +43,4 @@ String toES(String string) {
}
return alias;
}

static FieldAlias load(Settings settings) {
List<String> aliases = StringUtils.tokenize(settings.getProperty(MAPPING_NAMES), ",");

Map<String, String> aliasMap = new LinkedHashMap<String, String>();

if (aliases != null) {
for (String string : aliases) {
// split alias
string = string.trim();
int index = string.indexOf(":");
if (index > 0) {
String key = string.substring(0, index);
// save the lower case version as well to speed, lookup
aliasMap.put(key, string.substring(index + 1));
aliasMap.put(key.toLowerCase(), string.substring(index + 1));
}
}
}

return new FieldAlias(aliasMap);
}
}

0 comments on commit 509730e

Please sign in to comment.