Skip to content

Commit

Permalink
Refactor how a query is partitioned
Browse files Browse the repository at this point in the history
This change changes how the splits are created from an elasticsearch query.
The PartitionDefinition is splitted in two classes, ShardPartitionDefinition which modelizes the index/shard partitioning and SlicePartitionDefinition which modelizes the index/slice partitioning.
SlicePartitionDefinition is used for cluster created with elasticsearch version 5 and the size of each partition in terms of number of documents can be modified using the configuration option:
`es.input.maxdocsperpartition`.
The mapping is no longer attached to the partition and is retrieved lazily when needed.
The config is no longer attached to the partition and the main config is used when needed.
Some cleanups for building requests and queries.
ShardSorter has been removed since we can have more than one shardId per node. Alias filters and routing values are retrieved for each index and added to the partition request if needed.
  • Loading branch information
jimczi committed Jul 22, 2016
1 parent 8c6718e commit e1ca9af
Show file tree
Hide file tree
Showing 51 changed files with 2,050 additions and 1,497 deletions.
Expand Up @@ -70,8 +70,8 @@ static Settings addDefaultsToSettings(Properties flowProperties, Properties tapP
}

static void initialDiscovery(Settings settings, Log log) {
InitializationUtils.discoverEsVersion(settings, log);
InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
}
Expand Down
Expand Up @@ -64,7 +64,6 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
private final String nodes;
private final int port;
private final Properties props;
private boolean IS_ES_20;

private static Log log = LogFactory.getLog(EsHadoopScheme.class);

Expand All @@ -91,7 +90,6 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand All @@ -110,7 +108,6 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
Expand Down
Expand Up @@ -58,8 +58,6 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
private final Properties props;
private transient RestRepository client;

private boolean IS_ES_20;

EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
this.query = query;
Expand All @@ -80,7 +78,6 @@ public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object
Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand Down
Expand Up @@ -26,14 +26,19 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.FieldPresenceValidation;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils;
import org.elasticsearch.hadoop.util.EsVersion;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;

import cascading.flow.FlowProcess;
Expand Down Expand Up @@ -84,8 +89,19 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
MappingUtils.validateMapping(fields, mapping, validation, log);
}

input = QueryBuilder.query(settings).fields(StringUtils.concatenate(fields, ",")).
build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));
EsVersion esVersion = client.getRestClient().esVersion();
Resource read = new Resource(settings, true);
SearchRequestBuilder queryBuilder =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.filters(QueryUtils.parseFilters(settings))
.fields(StringUtils.concatenate(fields, ","));
input = queryBuilder.build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));
}
return new TupleEntrySchemeIterator<Properties, ScrollQuery>(flowProcess, getScheme(), input, getIdentifier());
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ static class EsHiveSplit extends FileSplit {
private Path path;

EsHiveSplit() {
this(new ShardInputSplit(), null);
this(new EsInputSplit(), null);
}

EsHiveSplit(InputSplit delegate, Path path) {
Expand Down Expand Up @@ -119,8 +119,8 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public AbstractWritableShardRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
public AbstractWritableEsInputRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
InputSplit delegate = ((EsHiveSplit) split).delegate;
return isOutputAsJson(job) ? new JsonWritableShardRecordReader(delegate, job, reporter) : new WritableShardRecordReader(delegate, job, reporter);
return isOutputAsJson(job) ? new JsonWritableEsInputRecordReader(delegate, job, reporter) : new WritableEsInputRecordReader(delegate, job, reporter);
}
}
19 changes: 6 additions & 13 deletions hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -70,8 +70,6 @@ public class EsSerDe extends AbstractSerDe {
private BulkCommand command;

private boolean writeInitialized = false;
private boolean readInitialized = false;
private boolean IS_ES_20 = true;
private boolean trace = false;


Expand Down Expand Up @@ -99,15 +97,10 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException

@Override
public Object deserialize(Writable blob) throws SerDeException {
if (!readInitialized) {
readInitialized = true;
IS_ES_20 = SettingsUtils.isEs20(settings);
}

if (blob == null || blob instanceof NullWritable) {
return null;
}
Object des = hiveFromWritable(structTypeInfo, blob, alias, IS_ES_20);
Object des = hiveFromWritable(structTypeInfo, blob, alias);

if (trace) {
log.trace(String.format("Deserialized [%s] to [%s]", blob, des));
Expand Down Expand Up @@ -161,7 +154,7 @@ private void lazyInitializeWrite() {


@SuppressWarnings("unchecked")
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, boolean IS_ES_20) {
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias) {
if (data == null || data instanceof NullWritable) {
return null;
}
Expand All @@ -175,7 +168,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b

List<Object> list = new ArrayList<Object>();
for (Writable writable : aw.get()) {
list.add(hiveFromWritable(listElementType, writable, alias, IS_ES_20));
list.add(hiveFromWritable(listElementType, writable, alias));
}

return list;
Expand All @@ -188,8 +181,8 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
Map<Object, Object> map = new LinkedHashMap<Object, Object>();

for (Entry<Writable, Writable> entry : mw.entrySet()) {
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias, IS_ES_20),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias, IS_ES_20));
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias));
}

return map;
Expand All @@ -215,7 +208,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
break;
}
}
struct.add(hiveFromWritable(info.get(index), result, alias, IS_ES_20));
struct.add(hiveFromWritable(info.get(index), result, alias));
}
return struct;
}
Expand Down
Expand Up @@ -19,21 +19,25 @@
package org.elasticsearch.hadoop.integration.rest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.EsVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.After;
import org.junit.Before;
Expand All @@ -45,7 +49,7 @@
/**
*/
public class AbstractRestQueryTest {

private static Log log = LogFactory.getLog(AbstractRestQueryTest.class);
private RestRepository client;
private Settings settings;

Expand All @@ -66,7 +70,7 @@ public void stop() throws Exception {

@Test
public void testShardInfo() throws Exception {
Map<Shard, Node> shards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];
List<List<Map<String, Object>>> shards = client.getReadTargetShards();
System.out.println(shards);
assertNotNull(shards);
}
Expand All @@ -75,7 +79,18 @@ public void testShardInfo() throws Exception {
public void testQueryBuilder() throws Exception {
Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");
QueryBuilder qb = QueryBuilder.query(sets);
EsVersion esVersion = EsVersion.V_5_X;
Resource read = new Resource(settings, true);
SearchRequestBuilder qb =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.fields(settings.getScrollFields())
.filters(QueryUtils.parseFilters(settings));
Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
Expand All @@ -90,33 +105,4 @@ public void testQueryBuilder() throws Exception {

assertTrue(count > 0);
}

@Test
public void testQueryShards() throws Exception {
Map<Shard, Node> targetShards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];

Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
ScrollReader reader = new ScrollReader(scrollReaderConfig);

Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");

String nodeId = targetShards.values().iterator().next().getId();
ScrollQuery query = QueryBuilder.query(sets)
.shard("0")
.node(nodeId)
.build(client, reader);

int count = 0;
for (; query.hasNext();) {
Object[] next = query.next();
System.out.println(Arrays.toString(next));
assertNotNull(next);
count++;
}

assertTrue(count > 0);
}
}
Expand Up @@ -127,6 +127,8 @@ public interface ConfigurationOptions {
String ES_SERIALIZATION_READER_VALUE_CLASS = "es.ser.reader.value.class";

/** Input options **/
String ES_MAX_DOCS_PER_PARTITION = "es.input.maxdocsperpartition";
int ES_DEFAULT_MAX_DOCS_PER_PARTITION = 100000;
String ES_INPUT_JSON = "es.input.json";
String ES_INPUT_JSON_DEFAULT = "no";

Expand Down
Expand Up @@ -64,4 +64,8 @@ public InputStream loadResource(String location) {
public Properties asProperties() {
return props;
}

public static Settings readFrom(String ser) {
return new PropertiesSettings(new Properties()).load(ser);
}
}
9 changes: 9 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -453,6 +453,11 @@ public Settings setQuery(String query) {
return this;
}

public Settings setMaxDocsPerPartition(int size) {
setProperty(ES_MAX_DOCS_PER_PARTITION, Integer.toString(size));
return this;
}

protected String getResource() {
return getProperty(ES_RESOURCE);
}
Expand All @@ -469,6 +474,10 @@ public String getQuery() {
return getProperty(ES_QUERY);
}

public int getMaxDocsPerPartition() {
return Integer.parseInt(getProperty(ES_MAX_DOCS_PER_PARTITION, Integer.toString(ES_DEFAULT_MAX_DOCS_PER_PARTITION)));
}

public boolean getReadMetadata() {
return Booleans.parseBoolean(getProperty(ES_READ_METADATA, ES_READ_METADATA_DEFAULT));
}
Expand Down

0 comments on commit e1ca9af

Please sign in to comment.