Skip to content

Commit

Permalink
LuceneDocCollector: filter for already collected documents on ordered
Browse files Browse the repository at this point in the history
queries
  • Loading branch information
Philipp Bogensberger committed Apr 16, 2015
1 parent 1e89bad commit b92c683
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Changes for Crate
Unreleased
==========

- Reduced amount of used memory on sorted queries
- Increased speed and reduced amount of used memory on sorted queries

- Using unsupported URI schema in COPY FROM statement now results in an error.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
import io.crate.analyze.OrderBy;
import io.crate.breaker.CrateCircuitBreakerService;
import io.crate.breaker.RamAccountingContext;
import io.crate.lucene.QueryBuilderHelper;
import io.crate.metadata.Functions;
import io.crate.operation.*;
import io.crate.operation.reference.doc.lucene.CollectorContext;
import io.crate.operation.reference.doc.lucene.LuceneCollectorExpression;
import io.crate.operation.reference.doc.lucene.LuceneDocLevelReferenceResolver;
import io.crate.operation.reference.doc.lucene.OrderByCollectorExpression;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.symbol.Reference;
import io.crate.planner.symbol.Symbol;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;

Expand Down Expand Up @@ -216,7 +220,15 @@ public void doCollect(RamAccountingContext ramAccountingContext) throws Exceptio
ScoreDoc lastCollected = collectTopFields(topFieldDocs);
while ((limit == null || collected < limit) && topFieldDocs.scoreDocs.length >= batchSize && lastCollected != null) {
batchSize = limit == null ? pageSize : Math.min(pageSize, limit - collected);
topFieldDocs = (TopFieldDocs)searchContext.searcher().searchAfter(lastCollected, query, batchSize, sort);
Query alreadyCollectedQuery = alreadyCollectedQuery((FieldDoc)lastCollected);
if (alreadyCollectedQuery != null) {
BooleanQuery searchAfterQuery = new BooleanQuery();
searchAfterQuery.add(query, BooleanClause.Occur.MUST);
searchAfterQuery.add(alreadyCollectedQuery, BooleanClause.Occur.MUST_NOT);
topFieldDocs = (TopFieldDocs)searchContext.searcher().searchAfter(lastCollected, searchAfterQuery, batchSize, sort);
} else {
topFieldDocs = (TopFieldDocs)searchContext.searcher().searchAfter(lastCollected, query, batchSize, sort);
}
collected += topFieldDocs.scoreDocs.length;
lastCollected = collectTopFields(topFieldDocs);
}
Expand Down Expand Up @@ -263,4 +275,22 @@ private ScoreDoc collectTopFields(TopFieldDocs topFieldDocs) throws IOException{
}
return lastDoc;
}

private @Nullable Query alreadyCollectedQuery(FieldDoc lastCollected) {
if (orderBy.isSorted()) {
Symbol order = orderBy.orderBySymbols().get(0);
Object value = lastCollected.fields[0];
// only filter for null values if nulls last
if (order instanceof Reference && (value != null || !orderBy.nullsFirst()[0])) {
QueryBuilderHelper helper = QueryBuilderHelper.forType(order.valueType());
String columnName = ((Reference)order).info().ident().columnIdent().fqn();
if (orderBy.reverseFlags()[0]) {
return helper.rangeQuery(columnName, value, null, false, false );
} else {
return helper.rangeQuery(columnName, null, value, false, false );
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -97,6 +99,8 @@ public class LuceneDocCollectorBenchmark extends BenchmarkBase {
public static final int BENCHMARK_ROUNDS = 100;
public static final int WARMUP_ROUNDS = 10;

public static final int PAGE_SIZE = 10_000;

public final static ESLogger logger = Loggers.getLogger(LuceneDocCollectorBenchmark.class);

private ShardId shardId = new ShardId(INDEX_NAME, 0);
Expand Down Expand Up @@ -196,9 +200,11 @@ private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, Pr
JobCollectContext jobCollectContext = collectContextService.acquireContext(node.jobId().get());
jobCollectContext.registerJobContextId(shardId, jobSearchContextId);
LuceneDocCollector collector = (LuceneDocCollector)shardCollectService.getCollector(node, projectorChain, jobCollectContext, 0);
collector.pageSize(PAGE_SIZE);
return collector;
}

@Override
protected void doGenerateData() throws Exception {
if (!dataGenerated) {

Expand Down Expand Up @@ -246,9 +252,11 @@ public void run() {
@BenchmarkOptions(benchmarkRounds = BENCHMARK_ROUNDS, warmupRounds = WARMUP_ROUNDS)
@Test
public void testLuceneDocCollectorOrderedWithScrollingPerformance() throws Exception{
collectingProjector.rows.clear();
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols());
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
collectingProjector.finish();
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCUMENTS));
}

@BenchmarkOptions(benchmarkRounds = BENCHMARK_ROUNDS, warmupRounds = WARMUP_ROUNDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,17 @@

import static io.crate.testing.TestingHelpers.createReference;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@CrateIntegrationTest.ClusterScope(scope = CrateIntegrationTest.Scope.SUITE, numNodes = 1)
public class LuceneDocCollectorTest extends SQLTransportIntegrationTest {

private final static Integer PAGE_SIZE = 2000;
private final static Integer PAGE_SIZE = 20;
private final static String INDEX_NAME = "countries";
private final static Integer NUMBER_OF_DOCS = 2500;
private final static Integer NUMBER_OF_DOCS = 25;
private ShardId shardId = new ShardId(INDEX_NAME, 0);
private OrderBy orderBy;
private CollectContextService collectContextService;
Expand Down Expand Up @@ -118,6 +119,8 @@ public void generateData() throws Exception {
indexRequest.source(generateRowSource("Europe", "Germany", i));
} else if (i == 1) {
indexRequest.source(generateRowSource("Europe", "Austria", i));
} else if (i >= 2 && i <=4) {
indexRequest.source(generateRowSource("Europe", null, i));
} else {
indexRequest.source(generateRowSource("America", "USA", i));
}
Expand All @@ -129,10 +132,10 @@ public void generateData() throws Exception {
}

private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, List<Symbol> toCollect) throws Exception{
return createDocCollector(orderBy, limit, toCollect, WhereClause.MATCH_ALL);
return createDocCollector(orderBy, limit, toCollect, WhereClause.MATCH_ALL, PAGE_SIZE);
}

private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, List<Symbol> toCollect, WhereClause whereClause) throws Exception{
private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, List<Symbol> toCollect, WhereClause whereClause, int pageSize) throws Exception{
CollectNode node = new CollectNode();
node.whereClause(whereClause);
node.orderBy(orderBy);
Expand All @@ -148,24 +151,24 @@ private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, Li
JobCollectContext jobCollectContext = collectContextService.acquireContext(node.jobId().get());
jobCollectContext.registerJobContextId(shardId, jobSearchContextId);
LuceneDocCollector collector = (LuceneDocCollector)shardCollectService.getCollector(node, projectorChain, jobCollectContext, 0);
collector.pageSize(PAGE_SIZE);
collector.pageSize(pageSize);
return collector;
}

@Test
public void testLimitWithoutOrder() throws Exception{
collectingProjector.rows.clear();
LuceneDocCollector docCollector = createDocCollector(null, 500, orderBy.orderBySymbols());
LuceneDocCollector docCollector = createDocCollector(null, 15, orderBy.orderBySymbols());
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(500));
assertThat(collectingProjector.rows.size(), is(15));
}

@Test
public void testOrderedWithLimit() throws Exception{
collectingProjector.rows.clear();
LuceneDocCollector docCollector = createDocCollector(orderBy, 500, orderBy.orderBySymbols());
LuceneDocCollector docCollector = createDocCollector(orderBy, 15, orderBy.orderBySymbols());
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(500));
assertThat(collectingProjector.rows.size(), is(15));
assertThat(((BytesRef)collectingProjector.rows.get(0)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(1)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(2)[0]).utf8ToString(), is("USA") );
Expand All @@ -175,9 +178,9 @@ public void testOrderedWithLimit() throws Exception{
@Test
public void testOrderedWithLimitHigherThanPageSize() throws Exception{
collectingProjector.rows.clear();
LuceneDocCollector docCollector = createDocCollector(orderBy, PAGE_SIZE + 250, orderBy.orderBySymbols());
LuceneDocCollector docCollector = createDocCollector(orderBy, PAGE_SIZE + 5, orderBy.orderBySymbols());
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(PAGE_SIZE + 250));
assertThat(collectingProjector.rows.size(), is(PAGE_SIZE + 5));
assertThat(((BytesRef)collectingProjector.rows.get(0)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(1)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(2)[0]).utf8ToString(), is("USA") );
Expand All @@ -187,12 +190,64 @@ public void testOrderedWithLimitHigherThanPageSize() throws Exception{
@Test
public void testOrderedWithoutLimit() throws Exception {
collectingProjector.rows.clear();
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols());
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols(), WhereClause.MATCH_ALL, 1);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
assertThat(((BytesRef)collectingProjector.rows.get(0)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(1)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(2)[0]).utf8ToString(), is("USA") );
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS -1)[0], is(nullValue()));
}

@Test
public void testOrderedNullsFirstWithoutLimit() throws Exception {
collectingProjector.rows.clear();
ReferenceIdent ident = new ReferenceIdent(new TableIdent("doc", "countries"), "countryName");
Reference ref = new Reference(new ReferenceInfo(ident, RowGranularity.DOC, DataTypes.STRING));
OrderBy orderBy = new OrderBy(ImmutableList.of((Symbol)ref), new boolean[]{false}, new Boolean[]{true});
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols(), WhereClause.MATCH_ALL, 1);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
assertThat(collectingProjector.rows.get(0)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(1)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(2)[0], is(nullValue()));
assertThat(((BytesRef)collectingProjector.rows.get(3)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(4)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(5)[0]).utf8ToString(), is("USA") );
}

@Test
public void testOrderedDescendingWithoutLimit() throws Exception {
collectingProjector.rows.clear();
ReferenceIdent ident = new ReferenceIdent(new TableIdent("doc", "countries"), "countryName");
Reference ref = new Reference(new ReferenceInfo(ident, RowGranularity.DOC, DataTypes.STRING));
OrderBy orderBy = new OrderBy(ImmutableList.of((Symbol)ref), new boolean[]{true}, new Boolean[]{false});
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols(), WhereClause.MATCH_ALL, 1);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 1)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 2)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 3)[0], is(nullValue()));
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 4)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 5)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 6)[0]).utf8ToString(), is("USA") );
}

@Test
public void testOrderedDescendingNullsFirstWithoutLimit() throws Exception {
collectingProjector.rows.clear();
ReferenceIdent ident = new ReferenceIdent(new TableIdent("doc", "countries"), "countryName");
Reference ref = new Reference(new ReferenceInfo(ident, RowGranularity.DOC, DataTypes.STRING));
OrderBy orderBy = new OrderBy(ImmutableList.of((Symbol)ref), new boolean[]{true}, new Boolean[]{true});
LuceneDocCollector docCollector = createDocCollector(orderBy, null, orderBy.orderBySymbols(), WhereClause.MATCH_ALL, 1);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
assertThat(collectingProjector.rows.get(0)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(1)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(2)[0], is(nullValue()));
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 1)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 2)[0]).utf8ToString(), is("Germany") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 3)[0]).utf8ToString(), is("USA") );
}

@Test
Expand All @@ -210,7 +265,10 @@ public void testOrderForNonSelected() throws Exception {
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
assertThat(collectingProjector.rows.get(0).length, is(1));
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 3)[0]).utf8ToString(), is("USA") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 6)[0]).utf8ToString(), is("USA") );
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 5)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 4)[0], is(nullValue()));
assertThat(collectingProjector.rows.get(NUMBER_OF_DOCS - 3)[0], is(nullValue()));
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 2)[0]).utf8ToString(), is("Austria") );
assertThat(((BytesRef)collectingProjector.rows.get(NUMBER_OF_DOCS - 1)[0]).utf8ToString(), is("Germany") );
}
Expand Down Expand Up @@ -247,7 +305,7 @@ public void testMinScoreQuery() throws Exception {
Arrays.<Symbol>asList(minScore_ref, Literal.newLiteral(1.1))
);
WhereClause whereClause = new WhereClause(function);
LuceneDocCollector docCollector = createDocCollector(null, null, orderBy.orderBySymbols(), whereClause);
LuceneDocCollector docCollector = createDocCollector(null, null, orderBy.orderBySymbols(), whereClause, PAGE_SIZE);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(0));

Expand All @@ -259,7 +317,7 @@ public void testMinScoreQuery() throws Exception {
Arrays.<Symbol>asList(minScore_ref, Literal.newLiteral(1.0))
);
whereClause = new WhereClause(function);
docCollector = createDocCollector(null, null, orderBy.orderBySymbols(), whereClause);
docCollector = createDocCollector(null, null, orderBy.orderBySymbols(), whereClause, PAGE_SIZE);
docCollector.doCollect(RAM_ACCOUNTING_CONTEXT);
assertThat(collectingProjector.rows.size(), is(NUMBER_OF_DOCS));
}
Expand Down

0 comments on commit b92c683

Please sign in to comment.