Skip to content

Commit

Permalink
introduced support for "shard_size" for terms & terms_stats facets. T…
Browse files Browse the repository at this point in the history
…he "shard_size" is the number of term entries each shard will send back to the coordinating node. "shard_size" > "size" will increase the accuracy (both in terms of the counts associated with each term and the terms that will actually be returned the user) - of course, the higher "shard_size" is, the more expensive the processing becomes as bigger queues are maintained on a shard level and larger lists are streamed back from the shards.

closes #3821
  • Loading branch information
uboness committed Oct 2, 2013
1 parent cc6f8f6 commit f0ad41c
Show file tree
Hide file tree
Showing 22 changed files with 1,217 additions and 55 deletions.
Expand Up @@ -37,6 +37,7 @@ public class TermsFacetBuilder extends FacetBuilder {
private String fieldName;
private String[] fieldsNames;
private int size = 10;
private int shardSize = -1;
private Boolean allTerms;
private Object[] exclude;
private String regex;
Expand Down Expand Up @@ -124,6 +125,16 @@ public TermsFacetBuilder size(int size) {
return this;
}

/**
* Sets the number of terms that will be returned from each shard. The higher the number the more accurate the results will be. The
* shard size cannot be smaller than {@link #size(int) size}, therefore in this case it will fall back and be treated as being equal to
* size.
*/
public TermsFacetBuilder shardSize(int shardSize) {
this.shardSize = shardSize;
return this;
}

/**
* A regular expression to use in order to further filter terms.
*/
Expand Down Expand Up @@ -213,6 +224,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("field", fieldName);
}
builder.field("size", size);

// no point in sending shard size if it's not greater than size
if (shardSize > size) {
builder.field("shard_size", shardSize);
}

if (exclude != null) {
builder.startArray("exclude");
for (Object ex : exclude) {
Expand Down
Expand Up @@ -82,6 +82,7 @@ public FacetExecutor.Mode defaultGlobalMode() {
public FacetExecutor parse(String facetName, XContentParser parser, SearchContext context) throws IOException {
String field = null;
int size = 10;
int shardSize = -1;

String[] fieldsNames = null;
ImmutableSet<BytesRef> excluded = ImmutableSet.of();
Expand Down Expand Up @@ -124,6 +125,8 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
script = parser.text();
} else if ("size".equals(currentFieldName)) {
size = parser.intValue();
} else if ("shard_size".equals(currentFieldName)) {
shardSize = parser.intValue();
} else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) {
allTerms = parser.booleanValue();
} else if ("regex".equals(currentFieldName)) {
Expand Down Expand Up @@ -161,6 +164,11 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
searchScript = context.scriptService().search(context.lookup(), scriptLang, script, params);
}

// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
if (shardSize < size) {
shardSize = size;
}

if (fieldsNames != null) {

// in case of multi files, we only collect the fields that are mapped and facet on them.
Expand All @@ -175,10 +183,10 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
// non of the fields is mapped
return new UnmappedFieldExecutor(size, comparatorType);
}
return new FieldsTermsStringFacetExecutor(facetName, mappers.toArray(new FieldMapper[mappers.size()]), size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new FieldsTermsStringFacetExecutor(mappers.toArray(new FieldMapper[mappers.size()]), size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
}
if (field == null && fieldsNames == null && script != null) {
return new ScriptTermsStringFieldFacetExecutor(size, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler());
return new ScriptTermsStringFieldFacetExecutor(size, shardSize, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler());
}

FieldMapper fieldMapper = context.smartNameFieldMapper(field);
Expand All @@ -190,17 +198,17 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
if (indexFieldData instanceof IndexNumericFieldData) {
IndexNumericFieldData indexNumericFieldData = (IndexNumericFieldData) indexFieldData;
if (indexNumericFieldData.getNumericType().isFloatingPoint()) {
return new TermsDoubleFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
return new TermsDoubleFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
} else {
return new TermsLongFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
return new TermsLongFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
}
} else {
if (script != null || "map".equals(executionHint)) {
return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
} else if (indexFieldData instanceof IndexFieldData.WithOrdinals) {
return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove);
return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove);
} else {
return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
}
}
}
Expand Down
Expand Up @@ -162,7 +162,13 @@ public long getOtherCount() {
public Facet reduce(ReduceContext context) {
List<Facet> facets = context.facets();
if (facets.size() == 1) {
return facets.get(0);
Facet facet = facets.get(0);

// can be of type InternalStringTermsFacet representing unmapped fields
if (facet instanceof InternalDoubleTermsFacet) {
((InternalDoubleTermsFacet) facet).trimExcessEntries();
}
return facet;
}

InternalDoubleTermsFacet first = null;
Expand Down Expand Up @@ -197,6 +203,25 @@ public Facet reduce(ReduceContext context) {
return first;
}

private void trimExcessEntries() {
if (requiredSize >= entries.size()) {
return;
}

if (entries instanceof List) {
entries = ((List) entries).subList(0, requiredSize);
return;
}

int i = 0;
for (Iterator<DoubleEntry> iter = entries.iterator(); iter.hasNext();) {
iter.next();
if (i++ >= requiredSize) {
iter.remove();
}
}
}

static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
Expand Down
Expand Up @@ -53,17 +53,19 @@ public class TermsDoubleFacetExecutor extends FacetExecutor {
private final IndexNumericFieldData indexFieldData;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int shardSize;
private final SearchScript script;
private final ImmutableSet<BytesRef> excluded;

final Recycler.V<TDoubleIntHashMap> facets;
long missing;
long total;

public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, SearchScript script, CacheRecycler cacheRecycler) {
this.indexFieldData = indexFieldData;
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.excluded = excluded;
Expand Down Expand Up @@ -120,7 +122,7 @@ public InternalFacet buildFacet(String facetName) {
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalDoubleTermsFacet.DoubleEntry>of(), missing, total);
} else {
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator());
for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.insertWithOverflow(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
Expand All @@ -132,7 +134,7 @@ public InternalFacet buildFacet(String facetName) {
facets.release();
return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total);
} else {
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size);
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), shardSize);
for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
Expand Down
Expand Up @@ -163,7 +163,13 @@ public long getOtherCount() {
public Facet reduce(ReduceContext context) {
List<Facet> facets = context.facets();
if (facets.size() == 1) {
return facets.get(0);
Facet facet = facets.get(0);

// facet could be InternalStringTermsFacet representing unmapped fields
if (facet instanceof InternalLongTermsFacet) {
((InternalLongTermsFacet) facet).trimExcessEntries();
}
return facet;
}

InternalLongTermsFacet first = null;
Expand Down Expand Up @@ -198,6 +204,25 @@ public Facet reduce(ReduceContext context) {
return first;
}

private void trimExcessEntries() {
if (requiredSize >= entries.size()) {
return;
}

if (entries instanceof List) {
entries = ((List) entries).subList(0, requiredSize);
return;
}

int i = 0;
for (Iterator<LongEntry> iter = entries.iterator(); iter.hasNext();) {
iter.next();
if (i++ >= requiredSize) {
iter.remove();
}
}
}

static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
Expand Down
Expand Up @@ -52,6 +52,7 @@ public class TermsLongFacetExecutor extends FacetExecutor {

private final IndexNumericFieldData indexFieldData;
private final TermsFacet.ComparatorType comparatorType;
private final int shardSize;
private final int size;
private final SearchScript script;
private final ImmutableSet<BytesRef> excluded;
Expand All @@ -60,10 +61,11 @@ public class TermsLongFacetExecutor extends FacetExecutor {
long missing;
long total;

public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, SearchScript script, CacheRecycler cacheRecycler) {
this.indexFieldData = indexFieldData;
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.excluded = excluded;
Expand Down Expand Up @@ -119,7 +121,7 @@ public InternalFacet buildFacet(String facetName) {
return new InternalLongTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalLongTermsFacet.LongEntry>of(), missing, total);
} else {
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator());
for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.insertWithOverflow(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
Expand All @@ -131,7 +133,7 @@ public InternalFacet buildFacet(String facetName) {
facets.release();
return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total);
} else {
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size);
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), shardSize);
for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
Expand Down
Expand Up @@ -41,15 +41,17 @@ public class FieldsTermsStringFacetExecutor extends FacetExecutor {

private final InternalStringTermsFacet.ComparatorType comparatorType;
private final int size;
private final int shardSize;
private final IndexFieldData[] indexFieldDatas;
private final SearchScript script;
private final HashedAggregator aggregator;
long missing;
long total;

public FieldsTermsStringFacetExecutor(String facetName, FieldMapper[] fieldMappers, int size, InternalStringTermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, Pattern pattern, SearchScript script) {
public FieldsTermsStringFacetExecutor(FieldMapper[] fieldMappers, int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType,
boolean allTerms, SearchContext context, ImmutableSet<BytesRef> excluded, Pattern pattern, SearchScript script) {
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.indexFieldDatas = new IndexFieldData[fieldMappers.length];
Expand Down Expand Up @@ -78,7 +80,7 @@ public Collector collector() {
@Override
public InternalFacet buildFacet(String facetName) {
try {
return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator);
return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator);
} finally {
aggregator.release();
}
Expand Down
Expand Up @@ -100,14 +100,13 @@ public static interface BytesRefCountIterator {
public boolean shared();
}

public static InternalFacet buildFacet(String facetName, int size, long missing, long total, TermsFacet.ComparatorType comparatorType,
public static InternalFacet buildFacet(String facetName, int size, int shardSize, long missing, long total, TermsFacet.ComparatorType comparatorType,
HashedAggregator aggregator) {
if (aggregator.isEmpty()) {
return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.TermEntry>of(),
missing, total);
return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.TermEntry>of(), missing, total);
} else {
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
if (shardSize < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator());
BytesRefCountIterator iter = aggregator.getIter();
while (iter.next() != null) {
ordered.insertWithOverflow(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count()));
Expand All @@ -120,8 +119,7 @@ public static InternalFacet buildFacet(String facetName, int size, long missing,
}
return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total);
} else {
BoundedTreeSet<InternalStringTermsFacet.TermEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.TermEntry>(
comparatorType.comparator(), size);
BoundedTreeSet<InternalStringTermsFacet.TermEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.TermEntry>(comparatorType.comparator(), shardSize);
BytesRefCountIterator iter = aggregator.getIter();
while (iter.next() != null) {
ordered.add(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count()));
Expand Down
Expand Up @@ -172,7 +172,9 @@ public long getOtherCount() {
public Facet reduce(ReduceContext context) {
List<Facet> facets = context.facets();
if (facets.size() == 1) {
return facets.get(0);
InternalStringTermsFacet facet = (InternalStringTermsFacet) facets.get(0);
facet.trimExcessEntries();
return facet;
}

InternalStringTermsFacet first = null;
Expand Down Expand Up @@ -215,6 +217,25 @@ public Facet reduce(ReduceContext context) {
return first;
}

private void trimExcessEntries() {
if (requiredSize >= entries.size()) {
return;
}

if (entries instanceof List) {
entries = ((List) entries).subList(0, requiredSize);
return;
}

int i = 0;
for (Iterator<TermEntry> iter = entries.iterator(); iter.hasNext();) {
iter.next();
if (i++ >= requiredSize) {
iter.remove();
}
}
}

static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
Expand Down

0 comments on commit f0ad41c

Please sign in to comment.