Skip to content

Commit

Permalink
Fixes for RestService.find{Shard,Slice}Partitions
Browse files Browse the repository at this point in the history
Add securemock as a test dependency for mr
Add tests
  • Loading branch information
jimczi committed Jul 26, 2016
1 parent 43a3527 commit e274b27
Show file tree
Hide file tree
Showing 4 changed files with 1,056 additions and 12 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -426,6 +426,7 @@ project(":elasticsearch-hadoop-mr") {
}

testCompile "io.netty:netty-all:4.0.23.Final"
testCompile "org.elasticsearch:securemock:1.2"
}

def generatedResources = "$buildDir/generated-resources/main"
Expand Down
121 changes: 109 additions & 12 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java
Expand Up @@ -43,7 +43,7 @@

public abstract class RestService implements Serializable {

public static abstract class PartitionDefinition implements Serializable {
public static abstract class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> {
private final byte type;
public final String index;

Expand Down Expand Up @@ -73,6 +73,30 @@ public static PartitionDefinition create(DataInput in) throws IOException {
throw new IOException("failed to deserialize partition type");
}
}

@Override
public int compareTo(PartitionDefinition o) {
return index.compareTo(o.index);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

PartitionDefinition that = (PartitionDefinition) o;

if (type != that.type) return false;
return index != null ? index.equals(that.index) : that.index == null;

}

@Override
public int hashCode() {
int result = (int) type;
result = 31 * result + (index != null ? index.hashCode() : 0);
return result;
}
}

public static class ShardPartitionDefinition extends PartitionDefinition {
Expand Down Expand Up @@ -101,6 +125,36 @@ public String toString() {
.append(",shard=").append(shardId).append("]");
return builder.toString();
}

@Override
public int compareTo(PartitionDefinition o) {
if (o instanceof ShardPartitionDefinition) {
int cmp = shardId.compareTo(((ShardPartitionDefinition) o).shardId);
if (cmp != 0) {
return cmp;
}
}
return super.compareTo(o);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;

ShardPartitionDefinition that = (ShardPartitionDefinition) o;

return shardId != null ? shardId.equals(that.shardId) : that.shardId == null;

}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
return result;
}
}

public static class SlicePartitionDefinition extends PartitionDefinition {
Expand All @@ -126,6 +180,43 @@ public void write(DataOutput out) throws IOException {
out.writeInt(max);
}

@Override
public int compareTo(PartitionDefinition o) {
if (o instanceof SlicePartitionDefinition) {
SlicePartitionDefinition other = (SlicePartitionDefinition) o;
int cmp = max - other.max;
if (cmp != 0) {
return cmp;
}
cmp = id - other.id;
if (cmp != 0) {
return cmp;
}
}
return super.compareTo(o);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;

SlicePartitionDefinition that = (SlicePartitionDefinition) o;

if (id != that.id) return false;
return max == that.max;

}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + id;
result = 31 * result + max;
return result;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down Expand Up @@ -323,16 +414,23 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
MappingUtils.validateMapping(settings.getScrollFields(), mapping, validation, log);
}
}
final List<PartitionDefinition> partitions;
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
QueryBuilder query = QueryUtils.parseQuery(settings);
List<QueryBuilder> filters = QueryUtils.parseFilters(settings);
QueryBuilder mainQuery = query;
if (filters.isEmpty() == false) {
mainQuery = new BoolQueryBuilder().must(query).filters(filters);
}
return findSlicePartitions(settings, client.getRestClient(), mainQuery, result);
int maxDocsPerPartition = settings.getMaxDocsPerPartition();
String types = new Resource(settings, true).type();
partitions = findSlicePartitions(client.getRestClient(), types, mainQuery, maxDocsPerPartition,
result);
} else {
partitions = findShardPartitions(result);
}
return findShardPartitions(result);
Collections.shuffle(partitions);
return partitions;
} finally {
client.close();
}
Expand All @@ -343,7 +441,7 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
* @param shards The list of requested shards
* @return A list of {@link PartitionDefinition}, one per shard for each requested index.
*/
public static List<PartitionDefinition> findShardPartitions(List<List<Map<String, Object>>> shards) {
static List<PartitionDefinition> findShardPartitions(List<List<Map<String, Object>>> shards) {
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
for (List<Map<String, Object>> group : shards) {
for (Map<String, Object> replica : group) {
Expand All @@ -363,10 +461,11 @@ public static List<PartitionDefinition> findShardPartitions(List<List<Map<String
* @param shards The list of requested shards
* @return A list of {@link PartitionDefinition}
*/
public static List<PartitionDefinition> findSlicePartitions(Settings settings,
RestClient client,
QueryBuilder query,
List<List<Map<String, Object>>> shards) {
static List<PartitionDefinition> findSlicePartitions(RestClient client,
String types,
QueryBuilder query,
int maxDocsPerPartition,
List<List<Map<String, Object>>> shards) {
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
Set<String> indices = new HashSet<String>();
for (List<Map<String, Object>> group : shards) {
Expand All @@ -376,14 +475,12 @@ public static List<PartitionDefinition> findSlicePartitions(Settings settings,
break;
}
}
Resource res = new Resource(settings, true);
for (String index : indices) {
StringBuilder indexAndType = new StringBuilder(index);
if (StringUtils.hasLength(indexAndType)) {
if (StringUtils.hasLength(types)) {
indexAndType.append("/");
indexAndType.append(res.type());
indexAndType.append(types);
}
int maxDocsPerPartition = settings.getMaxDocsPerPartition();
// TODO applyAliasMetaData should be called in order to ensure that the count are exact (alias filters and routing may change the number of documents)
long numDocs = client.count(indexAndType.toString(), query);
int numPartitions = (int) Math.max(1, numDocs/maxDocsPerPartition);
Expand Down
128 changes: 128 additions & 0 deletions mr/src/test/java/org/elasticsearch/hadoop/rest/FindPartitionsTest.java
@@ -0,0 +1,128 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.rest;

import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.hadoop.rest.query.MatchAllQueryBuilder;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

public class FindPartitionsTest {
private static final ObjectMapper MAPPER =
new ObjectMapper()
.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
private static final RestService.PartitionDefinition[] EXPECTED_SHARDS_PARTITIONS;
static {
List<RestService.PartitionDefinition> expected =
new ArrayList<RestService.PartitionDefinition>();
for (int i = 0; i < 15; i++) {
expected.add(new RestService.ShardPartitionDefinition("index1", Integer.toString(i)));
}
for (int i = 0; i < 18; i++) {
expected.add(new RestService.ShardPartitionDefinition("index2", Integer.toString(i)));
}
for (int i = 0; i < 1; i++) {
expected.add(new RestService.ShardPartitionDefinition("index3", Integer.toString(i)));
}
Collections.sort(expected);
EXPECTED_SHARDS_PARTITIONS = expected.toArray(new RestService.PartitionDefinition[0]);
}

@Test
public void testEmpty() {
assertEquals(RestService.findShardPartitions(Collections.<List<Map<String,Object>>>emptyList()).size(), 0);
assertEquals(RestService.findSlicePartitions(null, null, null, 0,
Collections.<List<Map<String,Object>>>emptyList()).size(), 0);
}

@Test
public void testShardPartitions() throws IOException {
List<List<Map<String, Object>>> shards =
MAPPER.readValue(getClass().getResourceAsStream("search-shards-response.json"), ArrayList.class);
List<RestService.PartitionDefinition> partitions = RestService.findShardPartitions(shards);
Collections.sort(partitions);
assertEquals(partitions.size(), 34);
assertEquals(new HashSet(partitions).size(), 34);
assertArrayEquals(partitions.toArray(), EXPECTED_SHARDS_PARTITIONS);
}

@Test
public void testSlicePartitions() throws IOException {
List<List<Map<String, Object>>> shards =
MAPPER.readValue(getClass().getResourceAsStream("search-shards-response.json"), ArrayList.class);
RestClient client = Mockito.mock(RestClient.class);
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Mockito.when(client.count("index1/type1", query)).thenReturn(1000L);
Mockito.when(client.count("index2/type1", query)).thenReturn(10000L);
Mockito.when(client.count("index3/type1", query)).thenReturn(100000L);
{
List<RestService.PartitionDefinition> partitions = RestService.findSlicePartitions(client,
"type1",
new MatchAllQueryBuilder(),
1000,
shards);
assertEquals(partitions.size(), 111);
assertEquals(new HashSet(partitions).size(), 111);
}
{

List<RestService.PartitionDefinition> partitions = RestService.findSlicePartitions(client,
"type1",
new MatchAllQueryBuilder(),
100,
shards);
assertEquals(partitions.size(), 1110);
assertEquals(new HashSet(partitions).size(), 1110);
}
{

List<RestService.PartitionDefinition> partitions = RestService.findSlicePartitions(client,
"type1",
new MatchAllQueryBuilder(),
Integer.MAX_VALUE,
shards);
assertEquals(partitions.size(), 3);
assertEquals(new HashSet(partitions).size(), 3);
}
Mockito.when(client.count("index1/type1", query)).thenReturn(0L);
Mockito.when(client.count("index2/type1", query)).thenReturn(0L);
Mockito.when(client.count("index3/type1", query)).thenReturn(0L);
{

List<RestService.PartitionDefinition> partitions = RestService.findSlicePartitions(client,
"type1",
new MatchAllQueryBuilder(),
100,
shards);
assertEquals(partitions.size(), 3);
assertEquals(new HashSet(partitions).size(), 3);
}
}
}

0 comments on commit e274b27

Please sign in to comment.