Skip to content

Commit

Permalink
Handle optional term and field statistics gracefully
Browse files Browse the repository at this point in the history
Lucene provides a set of statistics that depend on the codec / postingsformat
as well as on the index options used when the field is created / indexed.
If a certain stats value is not available lucene return `-1` instead of the
correct value. We need to ensure that those values are encoded correctly if
we try to write vLongs as well as when we aggregate those values.

Closes elastic#3012
  • Loading branch information
s1monw committed May 8, 2013
1 parent 6997fee commit f5e8f49
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 97 deletions.
92 changes: 92 additions & 0 deletions src/main/java/org/elasticsearch/common/collect/XMaps.java
@@ -0,0 +1,92 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.collect;

import gnu.trove.impl.Constants;

import java.util.Map;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.trove.ExtTHashMap;

import com.google.common.collect.ForwardingMap;

/**
* This class provides factory methods for Maps. The returned {@link Map}
* instances are general purpose maps and non of the method guarantees a
* concrete implementation unless the return type is a concrete type. The
* implementations used might change over time, if you rely on a specific
* Implementation you should use a concrete constructor.
*/
public final class XMaps {

public static final int DEFAULT_CAPACITY = Constants.DEFAULT_CAPACITY;

/**
* Returns a new map with the given initial capacity
*/
public static <K, V> Map<K, V> newMap(int capacity) {
return new ExtTHashMap<K, V>(capacity, Constants.DEFAULT_LOAD_FACTOR);
}

/**
* Returns a new map with a default initial capacity of
* {@value #DEFAULT_CAPACITY}
*/
public static <K, V> Map<K, V> newMap() {
return newMap(DEFAULT_CAPACITY);
}

/**
* Returns a map like {@link #newMap()} that does not accept <code>null</code> keys
*/
public static <K, V> Map<K, V> newNoNullKeysMap() {
Map<K, V> delegate = newMap();
return ensureNoNullKeys(delegate);
}

/**
* Returns a map like {@link #newMap(in)} that does not accept <code>null</code> keys
*/
public static <K, V> Map<K, V> newNoNullKeysMap(int capacity) {
Map<K, V> delegate = newMap(capacity);
return ensureNoNullKeys(delegate);
}

/**
* Wraps the given map and prevent adding of <code>null</code> keys.
*/
public static <K, V> Map<K, V> ensureNoNullKeys(final Map<K, V> delegate) {
return new ForwardingMap<K, V>() {
@Override
public V put(K key, V value) {
if (key == null) {
throw new ElasticSearchIllegalArgumentException("Map key must not be null");
}
return super.put(key, value);
}

@Override
protected Map<K, V> delegate() {
return delegate;
}
};
}
}
Expand Up @@ -21,19 +21,16 @@

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import gnu.trove.impl.Constants;
import gnu.trove.map.TMap;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.ShardFieldDocSortedHitQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
Expand All @@ -49,9 +46,6 @@
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry.Option;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -90,27 +84,37 @@ public boolean optimizeSingleShard() {
}

public AggregatedDfs aggregateDfs(Iterable<DfsSearchResult> results) {
TMap<Term, TermStatistics> termStatistics = new ExtTHashMap<Term, TermStatistics>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR);
TMap<String, CollectionStatistics> fieldStatistics = new ExtTHashMap<String, CollectionStatistics>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR);
Map<Term, TermStatistics> termStatistics = XMaps.newNoNullKeysMap();
Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
long aggMaxDoc = 0;
for (DfsSearchResult result : results) {
for (int i = 0; i < result.termStatistics().length; i++) {
TermStatistics existing = termStatistics.get(result.terms()[i]);
final Term[] terms = result.terms();
final TermStatistics[] stats = result.termStatistics();
assert terms.length == stats.length;
for (int i = 0; i < terms.length; i++) {
assert terms[i] != null;
TermStatistics existing = termStatistics.get(terms[i]);
if (existing != null) {
termStatistics.put(result.terms()[i], new TermStatistics(existing.term(), existing.docFreq() + result.termStatistics()[i].docFreq(), existing.totalTermFreq() + result.termStatistics()[i].totalTermFreq()));
assert terms[i].bytes().equals(existing.term());
// totalTermFrequency is an optional statistic we need to check if either one or both
// are set to -1 which means not present and then set it globally to -1
termStatistics.put(terms[i], new TermStatistics(existing.term(),
existing.docFreq() + stats[i].docFreq(),
optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
} else {
termStatistics.put(result.terms()[i], result.termStatistics()[i]);
termStatistics.put(terms[i], stats[i]);
}

}
for (Map.Entry<String, CollectionStatistics> entry : result.fieldStatistics().entrySet()) {
assert entry.getKey() != null;
CollectionStatistics existing = fieldStatistics.get(entry.getKey());
if (existing != null) {
CollectionStatistics merged = new CollectionStatistics(
entry.getKey(), existing.maxDoc() + entry.getValue().maxDoc(),
existing.docCount() + entry.getValue().docCount(),
existing.sumTotalTermFreq() + entry.getValue().sumTotalTermFreq(),
existing.sumDocFreq() + entry.getValue().sumDocFreq()
optionalSum(existing.docCount(), entry.getValue().docCount()),
optionalSum(existing.sumTotalTermFreq(), entry.getValue().sumTotalTermFreq()),
optionalSum(existing.sumDocFreq(), entry.getValue().sumDocFreq())
);
fieldStatistics.put(entry.getKey(), merged);
} else {
Expand All @@ -121,6 +125,10 @@ public AggregatedDfs aggregateDfs(Iterable<DfsSearchResult> results) {
}
return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
}

private static long optionalSum(long left, long right) {
return Math.min(left, right) == -1 ? -1 : left + right;
}

public ShardDoc[] sortDocs(Collection<? extends QuerySearchResultProvider> results1) {
if (results1.isEmpty()) {
Expand Down Expand Up @@ -267,7 +275,7 @@ public ShardDoc[] sortDocs(Collection<? extends QuerySearchResultProvider> resul
}

public Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad(ShardDoc[] shardDocs) {
Map<SearchShardTarget, ExtTIntArrayList> result = Maps.newHashMap();
Map<SearchShardTarget, ExtTIntArrayList> result = XMaps.newMap();
for (ShardDoc shardDoc : shardDocs) {
ExtTIntArrayList list = result.get(shardDoc.shardTarget());
if (list == null) {
Expand Down
52 changes: 17 additions & 35 deletions src/main/java/org/elasticsearch/search/dfs/AggregatedDfs.java
Expand Up @@ -19,43 +19,38 @@

package org.elasticsearch.search.dfs;

import gnu.trove.impl.Constants;
import gnu.trove.map.TMap;

import java.io.IOException;
import java.util.Map;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.trove.ExtTHashMap;

import java.io.IOException;
import java.util.Map;

/**
*
*/
public class AggregatedDfs implements Streamable {

private TMap<Term, TermStatistics> termStatistics;
private TMap<String, CollectionStatistics> fieldStatistics;
private Map<Term, TermStatistics> termStatistics;
private Map<String, CollectionStatistics> fieldStatistics;
private long maxDoc;

private AggregatedDfs() {

}

public AggregatedDfs(TMap<Term, TermStatistics> termStatistics, TMap<String, CollectionStatistics> fieldStatistics, long maxDoc) {
public AggregatedDfs(Map<Term, TermStatistics> termStatistics, Map<String, CollectionStatistics> fieldStatistics, long maxDoc) {
this.termStatistics = termStatistics;
this.fieldStatistics = fieldStatistics;
this.maxDoc = maxDoc;
}

public TMap<Term, TermStatistics> termStatistics() {
public Map<Term, TermStatistics> termStatistics() {
return termStatistics;
}

public TMap<String, CollectionStatistics> fieldStatistics() {
public Map<String, CollectionStatistics> fieldStatistics() {
return fieldStatistics;
}

Expand All @@ -72,19 +67,15 @@ public static AggregatedDfs readAggregatedDfs(StreamInput in) throws IOException
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
termStatistics = new ExtTHashMap<Term, TermStatistics>(size, Constants.DEFAULT_LOAD_FACTOR);
termStatistics = XMaps.newMap(size);
for (int i = 0; i < size; i++) {
Term term = new Term(in.readString(), in.readBytesRef());
TermStatistics stats = new TermStatistics(in.readBytesRef(), in.readVLong(), in.readVLong());
TermStatistics stats = new TermStatistics(in.readBytesRef(),
in.readVLong(),
DfsSearchResult.toNotAvailable(in.readVLong()));
termStatistics.put(term, stats);
}
size = in.readVInt();
fieldStatistics = new ExtTHashMap<String, CollectionStatistics>(size, Constants.DEFAULT_LOAD_FACTOR);
for (int i = 0; i < size; i++) {
String field = in.readString();
CollectionStatistics stats = new CollectionStatistics(field, in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
fieldStatistics.put(field, stats);
}
fieldStatistics = DfsSearchResult.readFieldStats(in);
maxDoc = in.readVLong();
}

Expand All @@ -98,18 +89,9 @@ public void writeTo(final StreamOutput out) throws IOException {
TermStatistics stats = termTermStatisticsEntry.getValue();
out.writeBytesRef(stats.term());
out.writeVLong(stats.docFreq());
out.writeVLong(stats.totalTermFreq());
out.writeVLong(DfsSearchResult.makePositive(stats.totalTermFreq()));
}

out.writeVInt(fieldStatistics.size());
for (Map.Entry<String, CollectionStatistics> entry : fieldStatistics.entrySet()) {
out.writeString(entry.getKey());
out.writeVLong(entry.getValue().maxDoc());
out.writeVLong(entry.getValue().docCount());
out.writeVLong(entry.getValue().sumTotalTermFreq());
out.writeVLong(entry.getValue().sumDocFreq());
}

DfsSearchResult.writeFieldStats(out, fieldStatistics);
out.writeVLong(maxDoc);
}
}
22 changes: 15 additions & 7 deletions src/main/java/org/elasticsearch/search/dfs/DfsPhase.java
Expand Up @@ -20,14 +20,13 @@
package org.elasticsearch.search.dfs;

import com.google.common.collect.ImmutableMap;
import gnu.trove.map.TMap;
import gnu.trove.set.hash.THashSet;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase;
Expand Down Expand Up @@ -57,18 +56,21 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext context) {
THashSet<Term> termsSet = null;
try {
if (!context.queryRewritten()) {
context.updateRewriteQuery(context.searcher().rewrite(context.query()));
}

THashSet<Term> termsSet = cachedTermsSet.get().get();
termsSet.clear();
termsSet = cachedTermsSet.get().get();
if (!termsSet.isEmpty()) {
termsSet.clear();
}
context.query().extractTerms(termsSet);
if (context.rescore() != null) {
context.rescore().rescorer().extractTerms(context, context.rescore(), termsSet);
}

Term[] terms = termsSet.toArray(new Term[termsSet.size()]);
TermStatistics[] termStatistics = new TermStatistics[terms.length];
IndexReaderContext indexReaderContext = context.searcher().getTopReaderContext();
Expand All @@ -78,10 +80,12 @@ public void execute(SearchContext context) {
termStatistics[i] = context.searcher().termStatistics(terms[i], termContext);
}

TMap<String, CollectionStatistics> fieldStatistics = new ExtTHashMap<String, CollectionStatistics>();
Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
for (Term term : terms) {
assert term.field() != null : "field is null";
if (!fieldStatistics.containsKey(term.field())) {
fieldStatistics.put(term.field(), context.searcher().collectionStatistics(term.field()));
final CollectionStatistics collectionStatistics = context.searcher().collectionStatistics(term.field());
fieldStatistics.put(term.field(), collectionStatistics);
}
}

Expand All @@ -90,6 +94,10 @@ public void execute(SearchContext context) {
.maxDoc(context.searcher().getIndexReader().maxDoc());
} catch (Exception e) {
throw new DfsPhaseExecutionException(context, "Exception during dfs phase", e);
} finally {
if (termsSet != null) {
termsSet.clear(); // don't hold on to terms
}
}
}
}

0 comments on commit f5e8f49

Please sign in to comment.