Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to create IndexPartition based on the desired number of documents per split #812

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -426,6 +426,7 @@ project(":elasticsearch-hadoop-mr") {
}

testCompile "io.netty:netty-all:4.0.23.Final"
testCompile "org.elasticsearch:securemock:1.2"
}

def generatedResources = "$buildDir/generated-resources/main"
Expand Down
Expand Up @@ -64,7 +64,6 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
private final String nodes;
private final int port;
private final Properties props;
private boolean IS_ES_20;

private static Log log = LogFactory.getLog(EsHadoopScheme.class);

Expand All @@ -91,7 +90,6 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand All @@ -110,7 +108,6 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
Expand Down
Expand Up @@ -58,8 +58,6 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
private final Properties props;
private transient RestRepository client;

private boolean IS_ES_20;

EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
this.query = query;
Expand All @@ -80,7 +78,6 @@ public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object
Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand Down
Expand Up @@ -26,14 +26,17 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.FieldPresenceValidation;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;

import cascading.flow.FlowProcess;
Expand Down Expand Up @@ -83,9 +86,20 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
if (validation.isRequired()) {
MappingUtils.validateMapping(fields, mapping, validation, log);
}

input = QueryBuilder.query(settings).fields(StringUtils.concatenate(fields, ",")).
build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));

EsMajorVersion esVersion = settings.getInternalVersionOrThrow();
Resource read = new Resource(settings, true);
SearchRequestBuilder queryBuilder =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.filters(QueryUtils.parseFilters(settings))
.fields(StringUtils.concatenate(fields, ","));
input = queryBuilder.build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));
}
return new TupleEntrySchemeIterator<Properties, ScrollQuery>(flowProcess, getScheme(), input, getIdentifier());
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ static class EsHiveSplit extends FileSplit {
private Path path;

EsHiveSplit() {
this(new ShardInputSplit(), null);
this(new EsInputSplit(), null);
}

EsHiveSplit(InputSplit delegate, Path path) {
Expand Down Expand Up @@ -119,8 +119,8 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public AbstractWritableShardRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
public AbstractWritableEsInputRecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) {
InputSplit delegate = ((EsHiveSplit) split).delegate;
return isOutputAsJson(job) ? new JsonWritableShardRecordReader(delegate, job, reporter) : new WritableShardRecordReader(delegate, job, reporter);
return isOutputAsJson(job) ? new JsonWritableEsInputRecordReader(delegate, job, reporter) : new WritableEsInputRecordReader(delegate, job, reporter);
}
}
19 changes: 6 additions & 13 deletions hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -70,8 +70,6 @@ public class EsSerDe extends AbstractSerDe {
private BulkCommand command;

private boolean writeInitialized = false;
private boolean readInitialized = false;
private boolean IS_ES_20 = true;
private boolean trace = false;


Expand Down Expand Up @@ -99,15 +97,10 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException

@Override
public Object deserialize(Writable blob) throws SerDeException {
if (!readInitialized) {
readInitialized = true;
IS_ES_20 = SettingsUtils.isEs20(settings);
}

if (blob == null || blob instanceof NullWritable) {
return null;
}
Object des = hiveFromWritable(structTypeInfo, blob, alias, IS_ES_20);
Object des = hiveFromWritable(structTypeInfo, blob, alias);

if (trace) {
log.trace(String.format("Deserialized [%s] to [%s]", blob, des));
Expand Down Expand Up @@ -161,7 +154,7 @@ private void lazyInitializeWrite() {


@SuppressWarnings("unchecked")
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, boolean IS_ES_20) {
static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias) {
if (data == null || data instanceof NullWritable) {
return null;
}
Expand All @@ -175,7 +168,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b

List<Object> list = new ArrayList<Object>();
for (Writable writable : aw.get()) {
list.add(hiveFromWritable(listElementType, writable, alias, IS_ES_20));
list.add(hiveFromWritable(listElementType, writable, alias));
}

return list;
Expand All @@ -188,8 +181,8 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
Map<Object, Object> map = new LinkedHashMap<Object, Object>();

for (Entry<Writable, Writable> entry : mw.entrySet()) {
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias, IS_ES_20),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias, IS_ES_20));
map.put(hiveFromWritable(mapType.getMapKeyTypeInfo(), entry.getKey(), alias),
hiveFromWritable(mapType.getMapValueTypeInfo(), entry.getValue(), alias));
}

return map;
Expand All @@ -215,7 +208,7 @@ static Object hiveFromWritable(TypeInfo type, Writable data, FieldAlias alias, b
break;
}
}
struct.add(hiveFromWritable(info.get(index), result, alias, IS_ES_20));
struct.add(hiveFromWritable(info.get(index), result, alias));
}
return struct;
}
Expand Down
Expand Up @@ -19,21 +19,24 @@
package org.elasticsearch.hadoop.integration.rest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.QueryBuilder;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.After;
import org.junit.Before;
Expand All @@ -45,7 +48,7 @@
/**
*/
public class AbstractRestQueryTest {

private static Log log = LogFactory.getLog(AbstractRestQueryTest.class);
private RestRepository client;
private Settings settings;

Expand All @@ -66,7 +69,7 @@ public void stop() throws Exception {

@Test
public void testShardInfo() throws Exception {
Map<Shard, Node> shards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];
List<List<Map<String, Object>>> shards = client.getReadTargetShards();
System.out.println(shards);
assertNotNull(shards);
}
Expand All @@ -75,7 +78,18 @@ public void testShardInfo() throws Exception {
public void testQueryBuilder() throws Exception {
Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");
QueryBuilder qb = QueryBuilder.query(sets);
EsMajorVersion esVersion = EsMajorVersion.V_5_X;
Resource read = new Resource(settings, true);
SearchRequestBuilder qb =
new SearchRequestBuilder(esVersion, settings.getReadMetadata() && settings.getReadMetadataVersion())
.types(read.type())
.indices(read.index())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.fields(settings.getScrollFields())
.filters(QueryUtils.parseFilters(settings));
Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
Expand All @@ -90,33 +104,4 @@ public void testQueryBuilder() throws Exception {

assertTrue(count > 0);
}

@Test
public void testQueryShards() throws Exception {
Map<Shard, Node> targetShards = (Map<Shard, Node>) client.getReadTargetShards(false)[1];

Field mapping = client.getMapping();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mapping, true, "_metadata", false, false);
ScrollReader reader = new ScrollReader(scrollReaderConfig);

Settings sets = settings.copy();
sets.setProperty(ConfigurationOptions.ES_QUERY, "?q=me*");

String nodeId = targetShards.values().iterator().next().getId();
ScrollQuery query = QueryBuilder.query(sets)
.shard("0")
.node(nodeId)
.build(client, reader);

int count = 0;
for (; query.hasNext();) {
Object[] next = query.next();
System.out.println(Arrays.toString(next));
assertNotNull(next);
count++;
}

assertTrue(count > 0);
}
}