Skip to content

Commit

Permalink
Refactor the partitioning of an elasticsearch query
Browse files Browse the repository at this point in the history
This commits changes how we split the query in multiple partitions.
For cluster running with version prior to v5.x:
* We create one partition for each shard of each requested index.
* If an alias is requested the search routing and the alias filter are respected.
* The partition is no longer attached to a node nor an ip. Only the shardId and index name are defined in order to be able to use any replica in the cluster when the partition is consumed. This makes the retry possible if a node disapears during a job.
* The ability to consume a partition on the node that is responsible for the index/shardId has been removed temporarily and should be re-added in a follow up.

For cluster ruuning with version v5.x:
* We first split by index then by shard and finally by the maximum number of documents allowed per partition (configurable through the new option named es.input.maxdocsperpartition. For instance an index with 5 shards, 1M documents and a maximum number of documents allowed per partition equals to 100,000, a match all query would be splitted in 50 partitions, 10 partitions per shard.
* If an alias is requested the search routing and the alias filter are respected.

Fixes #778
  • Loading branch information
jimczi committed Jul 26, 2016
1 parent d3db11d commit 3a307e1
Show file tree
Hide file tree
Showing 60 changed files with 3,586 additions and 1,541 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -426,6 +426,7 @@ project(":elasticsearch-hadoop-mr") {
}

testCompile "io.netty:netty-all:4.0.23.Final"
testCompile "org.elasticsearch:securemock:1.2"
}

def generatedResources = "$buildDir/generated-resources/main"
Expand Down
Expand Up @@ -64,7 +64,6 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
private final String nodes;
private final int port;
private final Properties props;
private boolean IS_ES_20;

private static Log log = LogFactory.getLog(EsHadoopScheme.class);

Expand All @@ -91,7 +90,6 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand All @@ -110,7 +108,6 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
Expand Down
Expand Up @@ -58,8 +58,6 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
private final Properties props;
private transient RestRepository client;

private boolean IS_ES_20;

EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
this.query = query;
Expand All @@ -80,7 +78,6 @@ public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object
Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand Down
Expand Up @@ -26,14 +26,17 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.FieldPresenceValidation;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;

import cascading.flow.FlowProcess;
Expand Down Expand Up @@ -83,9 +86,20 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
if (validation.isRequired()) {
MappingUtils.validateMapping(fields, mapping, validation, log);
}

input = QueryBuilder.query(settings).fields(StringUtils.concatenate(fields, ",")).
build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));

EsMajorVersion esVersion = settings.getInternalVersionOrThrow();
Resource read = new Resource(settings, true);
SearchRequestBuilder queryBuilder =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.filters(QueryUtils.parseFilters(settings))
.fields(StringUtils.concatenate(fields, ","));
input = queryBuilder.build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));
}
return new TupleEntrySchemeIterator<Properties, ScrollQuery>(flowProcess, getScheme(), input, getIdentifier());
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ static class EsHiveSplit extends FileSplit {
private Path path;

EsHiveSplit() {
this(new ShardInputSplit(), null);
this(new EsInputSplit(), null);
}

EsHiveSplit(InputSplit delegate, Path path) {
Expand Down Expand Up @@ -119,8 +119,8 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public AbstractWritableShardRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
public AbstractWritableEsInputRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
InputSplit delegate = ((EsHiveSplit) split).delegate;
return isOutputAsJson(job) ? new JsonWritableShardRecordReader(delegate, job, reporter) : new WritableShardRecordReader(delegate, job, reporter);
return isOutputAsJson(job) ? new JsonWritableEsInputRecordReader(delegate, job, reporter) : new WritableEsInputRecordReader(delegate, job, reporter);
}
}
19 changes: 6 additions & 13 deletions hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -70,8 +70,6 @@ public class EsSerDe extends AbstractSerDe {
private BulkCommand command;

private boolean writeInitialized = false;
private boolean readInitialized = false;
private boolean IS_ES_20 = true;
private boolean trace = false;


Expand Down Expand Up @@ -99,15 +97,10 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException

@Override
public Object deserialize(Writable blob) throws SerDeException {
if (!readInitialized) {
readInitialized = true;
IS_ES_20 = SettingsUtils.isEs20(settings);
}

if (blob == null || blob instanceof NullWritable) {
return null;
}
Object des = hiveFromWritable(structTypeInfo, blob, alias, IS_ES_20);
Object des = hiveFromWritable(structTypeInfo, blob, alias);

if (trace) {
log.trace(String.format("Deserialized [%s] to [%s]", blob, des));
Expand Down Expand Up @@ -161,7 +154,7 @@ private void lazyInitializeWrite() {


@SuppressWarnings("unchecked")
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, boolean IS_ES_20) {
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias) {
if (data == null || data instanceof NullWritable) {
return null;
}
Expand All @@ -175,7 +168,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b

List<Object> list = new ArrayList<Object>();
for (Writable writable : aw.get()) {
list.add(hiveFromWritable(listElementType, writable, alias, IS_ES_20));
list.add(hiveFromWritable(listElementType, writable, alias));
}

return list;
Expand All @@ -188,8 +181,8 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
Map<Object, Object> map = new LinkedHashMap<Object, Object>();

for (Entry<Writable, Writable> entry : mw.entrySet()) {
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias, IS_ES_20),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias, IS_ES_20));
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias));
}

return map;
Expand All @@ -215,7 +208,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
break;
}
}
struct.add(hiveFromWritable(info.get(index), result, alias, IS_ES_20));
struct.add(hiveFromWritable(info.get(index), result, alias));
}
return struct;
}
Expand Down
Expand Up @@ -19,21 +19,24 @@
package org.elasticsearch.hadoop.integration.rest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.After;
import org.junit.Before;
Expand All @@ -45,7 +48,7 @@
/**
*/
public class AbstractRestQueryTest {

private static Log log = LogFactory.getLog(AbstractRestQueryTest.class);
private RestRepository client;
private Settings settings;

Expand All @@ -66,7 +69,7 @@ public void stop() throws Exception {

@Test
public void testShardInfo() throws Exception {
Map<Shard, Node> shards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];
List<List<Map<String, Object>>> shards = client.getReadTargetShards();
System.out.println(shards);
assertNotNull(shards);
}
Expand All @@ -75,7 +78,18 @@ public void testShardInfo() throws Exception {
public void testQueryBuilder() throws Exception {
Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");
QueryBuilder qb = QueryBuilder.query(sets);
EsMajorVersion esVersion = EsMajorVersion.V_5_X;
Resource read = new Resource(settings, true);
SearchRequestBuilder qb =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.fields(settings.getScrollFields())
.filters(QueryUtils.parseFilters(settings));
Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
Expand All @@ -90,33 +104,4 @@ public void testQueryBuilder() throws Exception {

assertTrue(count > 0);
}

@Test
public void testQueryShards() throws Exception {
Map<Shard, Node> targetShards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];

Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
ScrollReader reader = new ScrollReader(scrollReaderConfig);

Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");

String nodeId = targetShards.values().iterator().next().getId();
ScrollQuery query = QueryBuilder.query(sets)
.shard("0")
.node(nodeId)
.build(client, reader);

int count = 0;
for (; query.hasNext();) {
Object[] next = query.next();
System.out.println(Arrays.toString(next));
assertNotNull(next);
count++;
}

assertTrue(count > 0);
}
}

0 comments on commit 3a307e1

Please sign in to comment.