From f0ad41cc1079553a0307e4bcc2032db9e96206b4 Mon Sep 17 00:00:00 2001 From: uboness Date: Wed, 2 Oct 2013 22:21:15 +0200 Subject: [PATCH] introduced support for "shard_size" for terms & terms_stats facets. The "shard_size" is the number of term entries each shard will send back to the coordinating node. "shard_size" > "size" will increase the accuracy (both in terms of the counts associated with each term and the terms that will actually be returned the user) - of course, the higher "shard_size" is, the more expensive the processing becomes as bigger queues are maintained on a shard level and larger lists are streamed back from the shards. closes #3821 --- .../search/facet/terms/TermsFacetBuilder.java | 17 + .../search/facet/terms/TermsFacetParser.java | 22 +- .../doubles/InternalDoubleTermsFacet.java | 27 +- .../doubles/TermsDoubleFacetExecutor.java | 8 +- .../terms/longs/InternalLongTermsFacet.java | 27 +- .../terms/longs/TermsLongFacetExecutor.java | 8 +- .../FieldsTermsStringFacetExecutor.java | 8 +- .../facet/terms/strings/HashedAggregator.java | 12 +- .../strings/InternalStringTermsFacet.java | 23 +- .../ScriptTermsStringFieldFacetExecutor.java | 10 +- .../strings/TermsStringFacetExecutor.java | 6 +- .../TermsStringOrdinalsFacetExecutor.java | 10 +- .../termsstats/TermsStatsFacetBuilder.java | 14 + .../termsstats/TermsStatsFacetParser.java | 19 +- .../InternalTermsStatsDoubleFacet.java | 22 +- .../TermsStatsDoubleFacetExecutor.java | 6 +- .../longs/InternalTermsStatsLongFacet.java | 22 +- .../longs/TermsStatsLongFacetExecutor.java | 6 +- .../InternalTermsStatsStringFacet.java | 26 +- .../TermsStatsStringFacetExecutor.java | 8 +- .../facet/terms/ShardSizeTermsFacetTests.java | 423 ++++++++++++++ .../ShardSizeTermsStatsFacetTests.java | 548 ++++++++++++++++++ 22 files changed, 1217 insertions(+), 55 deletions(-) create mode 100644 src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java create mode 100644 src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java diff --git a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java index 47f1daad655c0..88b28a5e6e5fc 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java @@ -37,6 +37,7 @@ public class TermsFacetBuilder extends FacetBuilder { private String fieldName; private String[] fieldsNames; private int size = 10; + private int shardSize = -1; private Boolean allTerms; private Object[] exclude; private String regex; @@ -124,6 +125,16 @@ public TermsFacetBuilder size(int size) { return this; } + /** + * Sets the number of terms that will be returned from each shard. The higher the number the more accurate the results will be. The + * shard size cannot be smaller than {@link #size(int) size}, therefore in this case it will fall back and be treated as being equal to + * size. + */ + public TermsFacetBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + /** * A regular expression to use in order to further filter terms. */ @@ -213,6 +224,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("field", fieldName); } builder.field("size", size); + + // no point in sending shard size if it's not greater than size + if (shardSize > size) { + builder.field("shard_size", shardSize); + } + if (exclude != null) { builder.startArray("exclude"); for (Object ex : exclude) { diff --git a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java index b8471ac7a2921..703f107a8fd74 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java @@ -82,6 +82,7 @@ public FacetExecutor.Mode defaultGlobalMode() { public FacetExecutor parse(String facetName, XContentParser parser, SearchContext context) throws IOException { String field = null; int size = 10; + int shardSize = -1; String[] fieldsNames = null; ImmutableSet excluded = ImmutableSet.of(); @@ -124,6 +125,8 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex script = parser.text(); } else if ("size".equals(currentFieldName)) { size = parser.intValue(); + } else if ("shard_size".equals(currentFieldName)) { + shardSize = parser.intValue(); } else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) { allTerms = parser.booleanValue(); } else if ("regex".equals(currentFieldName)) { @@ -161,6 +164,11 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex searchScript = context.scriptService().search(context.lookup(), scriptLang, script, params); } + // shard_size cannot be smaller than size as we need to at least fetch entries from every shards in order to return + if (shardSize < size) { + shardSize = size; + } + if (fieldsNames != null) { // in case of multi files, we only collect the fields that are mapped and facet on them. @@ -175,10 +183,10 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex // non of the fields is mapped return new UnmappedFieldExecutor(size, comparatorType); } - return new FieldsTermsStringFacetExecutor(facetName, mappers.toArray(new FieldMapper[mappers.size()]), size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new FieldsTermsStringFacetExecutor(mappers.toArray(new FieldMapper[mappers.size()]), size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } if (field == null && fieldsNames == null && script != null) { - return new ScriptTermsStringFieldFacetExecutor(size, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler()); + return new ScriptTermsStringFieldFacetExecutor(size, shardSize, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler()); } FieldMapper fieldMapper = context.smartNameFieldMapper(field); @@ -190,17 +198,17 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex if (indexFieldData instanceof IndexNumericFieldData) { IndexNumericFieldData indexNumericFieldData = (IndexNumericFieldData) indexFieldData; if (indexNumericFieldData.getNumericType().isFloatingPoint()) { - return new TermsDoubleFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); + return new TermsDoubleFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); } else { - return new TermsLongFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); + return new TermsLongFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); } } else { if (script != null || "map".equals(executionHint)) { - return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } else if (indexFieldData instanceof IndexFieldData.WithOrdinals) { - return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove); + return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove); } else { - return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } } } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java index df260768495ad..10b59a990ff38 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java @@ -162,7 +162,13 @@ public long getOtherCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + Facet facet = facets.get(0); + + // can be of type InternalStringTermsFacet representing unmapped fields + if (facet instanceof InternalDoubleTermsFacet) { + ((InternalDoubleTermsFacet) facet).trimExcessEntries(); + } + return facet; } InternalDoubleTermsFacet first = null; @@ -197,6 +203,25 @@ public Facet reduce(ReduceContext context) { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java index 86f9c24208f99..821dd23c2483c 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java @@ -53,6 +53,7 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { private final IndexNumericFieldData indexFieldData; private final TermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final SearchScript script; private final ImmutableSet excluded; @@ -60,10 +61,11 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { long missing; long total; - public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, SearchScript script, CacheRecycler cacheRecycler) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.excluded = excluded; @@ -120,7 +122,7 @@ public InternalFacet buildFacet(String facetName) { return new InternalDoubleTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value())); @@ -132,7 +134,7 @@ public InternalFacet buildFacet(String facetName) { facets.release(); return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java index 085a6cdf0a700..af89ec4d26a52 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java @@ -163,7 +163,13 @@ public long getOtherCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + Facet facet = facets.get(0); + + // facet could be InternalStringTermsFacet representing unmapped fields + if (facet instanceof InternalLongTermsFacet) { + ((InternalLongTermsFacet) facet).trimExcessEntries(); + } + return facet; } InternalLongTermsFacet first = null; @@ -198,6 +204,25 @@ public Facet reduce(ReduceContext context) { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java index f9d5c6722aaa4..9c39268093154 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java @@ -52,6 +52,7 @@ public class TermsLongFacetExecutor extends FacetExecutor { private final IndexNumericFieldData indexFieldData; private final TermsFacet.ComparatorType comparatorType; + private final int shardSize; private final int size; private final SearchScript script; private final ImmutableSet excluded; @@ -60,10 +61,11 @@ public class TermsLongFacetExecutor extends FacetExecutor { long missing; long total; - public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, SearchScript script, CacheRecycler cacheRecycler) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.excluded = excluded; @@ -119,7 +121,7 @@ public InternalFacet buildFacet(String facetName) { return new InternalLongTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalLongTermsFacet.LongEntry(it.key(), it.value())); @@ -131,7 +133,7 @@ public InternalFacet buildFacet(String facetName) { facets.release(); return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java index 8ecd473c831e1..3f4178909252d 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java @@ -41,15 +41,17 @@ public class FieldsTermsStringFacetExecutor extends FacetExecutor { private final InternalStringTermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final IndexFieldData[] indexFieldDatas; private final SearchScript script; private final HashedAggregator aggregator; long missing; long total; - public FieldsTermsStringFacetExecutor(String facetName, FieldMapper[] fieldMappers, int size, InternalStringTermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, - ImmutableSet excluded, Pattern pattern, SearchScript script) { + public FieldsTermsStringFacetExecutor(FieldMapper[] fieldMappers, int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType, + boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, SearchScript script) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.indexFieldDatas = new IndexFieldData[fieldMappers.length]; @@ -78,7 +80,7 @@ public Collector collector() { @Override public InternalFacet buildFacet(String facetName) { try { - return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator); + return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator); } finally { aggregator.release(); } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java index 54ef8e9ae0fb9..7dff3883e738d 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java @@ -100,14 +100,13 @@ public static interface BytesRefCountIterator { public boolean shared(); } - public static InternalFacet buildFacet(String facetName, int size, long missing, long total, TermsFacet.ComparatorType comparatorType, + public static InternalFacet buildFacet(String facetName, int size, int shardSize, long missing, long total, TermsFacet.ComparatorType comparatorType, HashedAggregator aggregator) { if (aggregator.isEmpty()) { - return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), - missing, total); + return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { - if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + if (shardSize < EntryPriorityQueue.LIMIT) { + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); BytesRefCountIterator iter = aggregator.getIter(); while (iter.next() != null) { ordered.insertWithOverflow(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count())); @@ -120,8 +119,7 @@ public static InternalFacet buildFacet(String facetName, int size, long missing, } return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet( - comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); BytesRefCountIterator iter = aggregator.getIter(); while (iter.next() != null) { ordered.add(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java index 03769e1879d7c..491232ab12321 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java @@ -172,7 +172,9 @@ public long getOtherCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + InternalStringTermsFacet facet = (InternalStringTermsFacet) facets.get(0); + facet.trimExcessEntries(); + return facet; } InternalStringTermsFacet first = null; @@ -215,6 +217,25 @@ public Facet reduce(ReduceContext context) { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java index b4d6966b9334f..a2201a9b7f109 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java @@ -48,6 +48,7 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { private final InternalStringTermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final SearchScript script; private final Matcher matcher; private final ImmutableSet excluded; @@ -57,10 +58,11 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { long missing; long total; - public ScriptTermsStringFieldFacetExecutor(int size, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context, + public ScriptTermsStringFieldFacetExecutor(int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context, ImmutableSet excluded, Pattern pattern, String scriptLang, String script, Map params, CacheRecycler cacheRecycler) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.numberOfShards = context.numberOfShards(); this.script = context.scriptService().search(context.lookup(), scriptLang, script, params); @@ -82,8 +84,8 @@ public InternalFacet buildFacet(String facetName) { facets.release(); return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { - if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + if (shardSize < EntryPriorityQueue.LIMIT) { + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TObjectIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalStringTermsFacet.TermEntry(it.key(), it.value())); @@ -95,7 +97,7 @@ public InternalFacet buildFacet(String facetName) { facets.release(); return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TObjectIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalStringTermsFacet.TermEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java index 101928d3fb8ab..7dfc7fe538361 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java @@ -44,6 +44,7 @@ public class TermsStringFacetExecutor extends FacetExecutor { private final TermsFacet.ComparatorType comparatorType; private final SearchScript script; + private final int shardSize; private final int size; // the aggregation map @@ -52,10 +53,11 @@ public class TermsStringFacetExecutor extends FacetExecutor { private final boolean allTerms; private final HashedAggregator aggregator; - public TermsStringFacetExecutor(IndexFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsStringFacetExecutor(IndexFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, SearchScript script) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.allTerms = allTerms; @@ -79,7 +81,7 @@ public Collector collector() { @Override public InternalFacet buildFacet(String facetName) { try { - return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator); + return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator); } finally { aggregator.release(); } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java index 72d5e09e1a34b..1fb6d1edc3c59 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java @@ -56,6 +56,7 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { final CacheRecycler cacheRecycler; private final TermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final int minCount; private final ImmutableSet excluded; private final Matcher matcher; @@ -65,10 +66,11 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { long missing; long total; - public TermsStringOrdinalsFacetExecutor(IndexFieldData.WithOrdinals indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsStringOrdinalsFacetExecutor(IndexFieldData.WithOrdinals indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, int ordinalsCacheAbove) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.ordinalsCacheAbove = ordinalsCacheAbove; @@ -107,9 +109,9 @@ public InternalFacet buildFacet(String facetName) { } // YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes - if (size < EntryPriorityQueue.LIMIT) { + if (shardSize < EntryPriorityQueue.LIMIT) { // optimize to use priority size - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); while (queue.size() > 0) { ReaderAggregator agg = queue.top(); @@ -149,7 +151,7 @@ public InternalFacet buildFacet(String facetName) { return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); while (queue.size() > 0) { ReaderAggregator agg = queue.top(); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java index b2ff0976eeb83..7a58f680dd331 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java @@ -36,6 +36,7 @@ public class TermsStatsFacetBuilder extends FacetBuilder { private String keyField; private String valueField; private int size = -1; + private int shardSize = -1; private TermsStatsFacet.ComparatorType comparatorType; private String script; @@ -75,6 +76,16 @@ public TermsStatsFacetBuilder size(int size) { return this; } + /** + * Sets the number of terms that will be returned from each shard. The higher the number the more accurate the results will be. The + * shard size cannot be smaller than {@link #size(int) size}, therefore in this case it will fall back and be treated as being equal to + * size. + */ + public TermsStatsFacetBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + /** * Marks all terms to be returned, even ones with 0 counts. */ @@ -146,6 +157,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (size != -1) { builder.field("size", size); } + if (shardSize > size) { + builder.field("shard_size", shardSize); + } builder.endObject(); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java index 3ccba3b3f4692..9bb1148cf5daa 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java @@ -67,6 +67,7 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex String keyField = null; String valueField = null; int size = 10; + int shardSize = -1; TermsStatsFacet.ComparatorType comparatorType = TermsStatsFacet.ComparatorType.COUNT; String scriptLang = null; String script = null; @@ -86,20 +87,20 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex keyField = parser.text(); } else if ("value_field".equals(currentFieldName) || "valueField".equals(currentFieldName)) { valueField = parser.text(); - } else if ("script_field".equals(currentFieldName)) { + } else if ("script_field".equals(currentFieldName) || "scriptField".equals(currentFieldName)) { script = parser.text(); - } else if ("value_script".equals(currentFieldName)) { + } else if ("value_script".equals(currentFieldName) || "valueScript".equals(currentFieldName)) { script = parser.text(); } else if ("size".equals(currentFieldName)) { size = parser.intValue(); + } else if ("shard_size".equals(currentFieldName) || "shardSize".equals(currentFieldName)) { + shardSize = parser.intValue(); } else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) { if (parser.booleanValue()) { size = 0; // indicates all terms } } else if ("order".equals(currentFieldName) || "comparator".equals(currentFieldName)) { comparatorType = TermsStatsFacet.ComparatorType.fromString(parser.text()); - } else if ("value_script".equals(currentFieldName)) { - script = parser.text(); } else if ("lang".equals(currentFieldName)) { scriptLang = parser.text(); } @@ -119,6 +120,10 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex } IndexFieldData keyIndexFieldData = context.fieldData().getForField(keyMapper); + if (shardSize < size) { + shardSize = size; + } + IndexNumericFieldData valueIndexFieldData = null; SearchScript valueScript = null; if (valueField != null) { @@ -136,11 +141,11 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex if (keyIndexFieldData instanceof IndexNumericFieldData) { IndexNumericFieldData keyIndexNumericFieldData = (IndexNumericFieldData) keyIndexFieldData; if (keyIndexNumericFieldData.getNumericType().isFloatingPoint()) { - return new TermsStatsDoubleFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsDoubleFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } else { - return new TermsStatsLongFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsLongFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } } - return new TermsStatsStringFacetExecutor(keyIndexFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsStringFacetExecutor(keyIndexFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java index 24b56b4c0e9a5..6a5ca0bba9565 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java @@ -172,14 +172,15 @@ public long getMissingCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } + tsFacet.trimExcessEntries(); return facets.get(0); } int missing = 0; @@ -228,6 +229,25 @@ public Facet reduce(ReduceContext context) { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java index 933cb31970740..e65cb134278e9 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java @@ -48,13 +48,15 @@ public class TermsStatsDoubleFacetExecutor extends FacetExecutor { final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsDoubleFacetExecutor(IndexNumericFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript script, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; @@ -81,7 +83,7 @@ public InternalFacet buildFacet(String facetName) { Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - int limit = size; + int limit = shardSize; List ordered = Lists.newArrayList(); for (int i = 0; i < limit; i++) { InternalTermsStatsDoubleFacet.DoubleEntry value = (InternalTermsStatsDoubleFacet.DoubleEntry) values[i]; diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java index f4a70153dcdfb..5e02f3adb3ef8 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java @@ -172,14 +172,15 @@ public long getMissingCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } + tsFacet.trimExcessEntries(); return facets.get(0); } int missing = 0; @@ -228,6 +229,25 @@ public Facet reduce(ReduceContext context) { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java index 776fc8d029737..fe3451296f845 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java @@ -49,13 +49,15 @@ public class TermsStatsLongFacetExecutor extends FacetExecutor { final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsLongFacetExecutor(IndexNumericFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript script, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; @@ -84,7 +86,7 @@ public InternalFacet buildFacet(String facetName) { Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - int limit = size; + int limit = shardSize; List ordered = Lists.newArrayList(); for (int i = 0; i < limit; i++) { InternalTermsStatsLongFacet.LongEntry value = (InternalTermsStatsLongFacet.LongEntry) values[i]; diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java index 2e56ca91c7678..667162a440f9e 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java @@ -177,15 +177,16 @@ public long getMissingCount() { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } - return facets.get(0); + tsFacet.trimExcessEntries(); + return tsFacet; } int missing = 0; Recycler.V> map = context.cacheRecycler().hashMap(-1); @@ -220,7 +221,7 @@ public Facet reduce(ReduceContext context) { } else { Object[] values = map.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - List ordered = new ArrayList(map.v().size()); + List ordered = new ArrayList(Math.min(map.v().size(), requiredSize)); for (int i = 0; i < requiredSize; i++) { StringEntry value = (StringEntry) values[i]; if (value == null) { @@ -233,6 +234,25 @@ public Facet reduce(ReduceContext context) { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java index eb7368540b048..f389239ae7432 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java @@ -51,16 +51,18 @@ public class TermsStatsStringFacetExecutor extends FacetExecutor { final IndexNumericFieldData valueIndexFieldData; final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsStringFacetExecutor(IndexFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript valueScript, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; this.script = valueScript; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.entries = context.cacheRecycler().hashMap(-1); @@ -79,13 +81,13 @@ public InternalFacet buildFacet(String facetName) { } if (size == 0) { // all terms // all terms, just return the collection, we will sort it on the way back - return new InternalTermsStatsStringFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.v().values(), missing); + return new InternalTermsStatsStringFacet(facetName, comparatorType, 0/* indicates all terms*/, entries.v().values(), missing); } Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); List ordered = Lists.newArrayList(); - int limit = size; + int limit = shardSize; for (int i = 0; i < limit; i++) { InternalTermsStatsStringFacet.StringEntry value = (InternalTermsStatsStringFacet.StringEntry) values[i]; if (value == null) { diff --git a/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java b/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java new file mode 100644 index 0000000000000..54d9eea5bb155 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java @@ -0,0 +1,423 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.facet.Facets; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.facet.FacetBuilders.termsFacet; +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.SUITE) +public class ShardSizeTermsFacetTests extends AbstractIntegrationTest { + + /** + * to properly test the effect/functionality of shard_size, we need to force having 2 shards and also + * control the routing such that certain documents will end on each shard. Using "djb" routing hash + ignoring the + * doc type when hashing will ensure that docs with routing value "1" will end up in a different shard than docs with + * routing value "2". + */ + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return randomSettingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + .put("cluster.routing.operation.hash.type", "djb") + .put("cluster.routing.operation.use_type", "false") + .build(); + } + + @Test + public void noShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 5) + .put("2", 4) + .put("3", 3) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_withExecutionHintMap() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).executionHint("map").order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_withExecutionHintMap_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).executionHint("map").order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 5) + .put("2", 4) + .put("3", 3) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 5) + .put(2, 4) + .put(3, 3) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 5) + .put(2, 4) + .put(3, 3) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + private void indexData() throws Exception { + + /* + + + || || size = 3, shard_size = 5 || shard_size = size = 3 || + ||==========||==================================================||===============================================|| + || shard 1: || "1" - 5 | "2" - 4 | "3" - 3 | "4" - 2 | "5" - 1 || "1" - 5 | "3" - 3 | "2" - 4 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || shard 2: || "1" - 3 | "2" - 1 | "3" - 5 | "4" - 2 | "5" - 1 || "1" - 3 | "3" - 5 | "4" - 2 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || reduced: || "1" - 8 | "2" - 5 | "3" - 8 | "4" - 4 | "5" - 2 || || + || || || "1" - 8, "3" - 8, "2" - 4 <= WRONG || + || || "1" - 8 | "3" - 8 | "2" - 5 <= CORRECT || || + + + */ + + + indexDoc("1", "1", 5); + indexDoc("1", "2", 4); + indexDoc("1", "3", 3); + indexDoc("1", "4", 2); + indexDoc("1", "5", 1); + + // total docs in shard "1" = 15 + + indexDoc("2", "1", 3); + indexDoc("2", "2", 1); + indexDoc("2", "3", 5); + indexDoc("2", "4", 2); + indexDoc("2", "5", 1); + + // total docs in shard "2" = 12 + + client().admin().indices().prepareFlush("idx").execute().actionGet(); + client().admin().indices().prepareRefresh("idx").execute().actionGet(); + + long totalOnOne = client().prepareSearch("idx").setTypes("type").setRouting("1").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnOne, is(15l)); + long totalOnTwo = client().prepareSearch("idx").setTypes("type").setRouting("2").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnTwo, is(12l)); + } + + private void indexDoc(String shard, String key, int times) throws Exception { + for (int i = 0; i < times; i++) { + client().prepareIndex("idx", "type").setRouting(shard).setCreate(true).setSource(jsonBuilder() + .startObject() + .field("key", key) + .endObject()).execute().actionGet(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java b/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java new file mode 100644 index 0000000000000..83c5fe5628aaa --- /dev/null +++ b/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java @@ -0,0 +1,548 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.termsstats; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.facet.Facets; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.facet.FacetBuilders.termsStatsFacet; +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.SUITE) +public class ShardSizeTermsStatsFacetTests extends AbstractIntegrationTest { + + /** + * to properly test the effect/functionality of shard_size, we need to force having 2 shards and also + * control the routing such that certain documents will end on each shard. Using "djb" routing hash + ignoring the + * doc type when hashing will ensure that docs with routing value "1" will end up in a different shard than docs with + * routing value "2". + */ + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return randomSettingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + .put("cluster.routing.operation.hash.type", "djb") + .put("cluster.routing.operation.use_type", "false") + .build(); + } + + @Test + public void noShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_string_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .put("4", 4l) + .put("5", 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .put("4", 4l) + .put("5", 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 5l) + .put("2", 4l) + .put("3", 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_long_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 5l) + .put(2, 4l) + .put(3, 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 5l) + .put(2, 4l) + .put(3, 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + private void indexData() throws Exception { + + /* + + + || || size = 3, shard_size = 5 || shard_size = size = 3 || + ||==========||==================================================||===============================================|| + || shard 1: || "1" - 5 | "2" - 4 | "3" - 3 | "4" - 2 | "5" - 1 || "1" - 5 | "3" - 3 | "2" - 4 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || shard 2: || "1" - 3 | "2" - 1 | "3" - 5 | "4" - 2 | "5" - 1 || "1" - 3 | "3" - 5 | "4" - 2 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || reduced: || "1" - 8 | "2" - 5 | "3" - 8 | "4" - 4 | "5" - 2 || || + || || || "1" - 8, "3" - 8, "2" - 4 <= WRONG || + || || "1" - 8 | "3" - 8 | "2" - 5 <= CORRECT || || + + + */ + + + indexDoc("1", "1", 5); + indexDoc("1", "2", 4); + indexDoc("1", "3", 3); + indexDoc("1", "4", 2); + indexDoc("1", "5", 1); + + // total docs in shard "1" = 15 + + indexDoc("2", "1", 3); + indexDoc("2", "2", 1); + indexDoc("2", "3", 5); + indexDoc("2", "4", 2); + indexDoc("2", "5", 1); + + // total docs in shard "2" = 12 + + client().admin().indices().prepareFlush("idx").execute().actionGet(); + client().admin().indices().prepareRefresh("idx").execute().actionGet(); + + long totalOnOne = client().prepareSearch("idx").setTypes("type").setRouting("1").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnOne, is(15l)); + long totalOnTwo = client().prepareSearch("idx").setTypes("type").setRouting("2").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnTwo, is(12l)); + } + + private void indexDoc(String shard, String key, int times) throws Exception { + for (int i = 0; i < times; i++) { + client().prepareIndex("idx", "type").setRouting(shard).setCreate(true).setSource(jsonBuilder() + .startObject() + .field("key", key) + .field("value", 1) + .endObject()).execute().actionGet(); + } + } + +}