Skip to content

Commit

Permalink
made OrderBy streamable so it’s possible to assign
Browse files Browse the repository at this point in the history
it directly to the collector
  • Loading branch information
Philipp Bogensberger committed Mar 2, 2015
1 parent 47938e7 commit 51d90af
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.crate.Constants;
import io.crate.analyze.OrderBy;
import io.crate.core.StringUtils;
import io.crate.executor.transport.task.elasticsearch.SortOrder;
import io.crate.lucene.LuceneQueryBuilder;
Expand Down Expand Up @@ -360,12 +361,10 @@ private static Sort generateLuceneSort(SearchContext context,

@Nullable
public static Sort generateLuceneSort(SearchContext context,
List<Symbol> symbols,
boolean[] reverseFlags,
Boolean[] nullsFirst,
OrderBy orderBy,
CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor) {
SortSymbolVisitor sortSymbolVisitor = new SortSymbolVisitor(inputSymbolVisitor);
return generateLuceneSort(context, symbols, reverseFlags, nullsFirst, sortSymbolVisitor);
return generateLuceneSort(context, orderBy.orderBySymbols(), orderBy.reverseFlags(), orderBy.nullsFirst(), sortSymbolVisitor);
}

private static class SortSymbolContext {
Expand Down
53 changes: 52 additions & 1 deletion sql/src/main/java/io/crate/analyze/OrderBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import io.crate.planner.symbol.Function;
import io.crate.planner.symbol.Symbol;
import io.crate.planner.symbol.SymbolVisitor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class OrderBy {
public class OrderBy implements Streamable {

private List<Symbol> orderBySymbols;
private boolean[] reverseFlags;
Expand All @@ -43,6 +48,8 @@ public OrderBy(List<Symbol> orderBySymbols, boolean[] reverseFlags, Boolean[] nu
this.nullsFirst = nullsFirst;
}

private OrderBy() {};

public List<Symbol> orderBySymbols() {
return orderBySymbols;
}
Expand Down Expand Up @@ -83,6 +90,50 @@ public static OrderBy fromSymbols(List<Symbol> symbols) {
return new OrderBy(symbols, reverseFlags, nullsFirst);
}

public static void toStream(OrderBy orderBy, StreamOutput out) throws IOException {
orderBy.writeTo(out);
}

public static OrderBy fromStream(StreamInput in) throws IOException {
OrderBy orderBy = new OrderBy();
orderBy.readFrom(in);
return orderBy;
}

@Override
public void readFrom(StreamInput in) throws IOException {
int numOrderBy = in.readVInt();
reverseFlags = new boolean[numOrderBy];

for (int i = 0; i < reverseFlags.length; i++) {
reverseFlags[i] = in.readBoolean();
}

orderBySymbols = new ArrayList<>(numOrderBy);
for (int i = 0; i < reverseFlags.length; i++) {
orderBySymbols.add(Symbol.fromStream(in));
}

nullsFirst = new Boolean[numOrderBy];
for (int i = 0; i < numOrderBy; i++) {
nullsFirst[i] = in.readOptionalBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(reverseFlags.length);
for (boolean reverseFlag : reverseFlags) {
out.writeBoolean(reverseFlag);
}
for (Symbol symbol : orderBySymbols) {
Symbol.toStream(symbol, out);
}
for (Boolean nullFirst : nullsFirst) {
out.writeOptionalBoolean(nullFirst);
}
}

private static class SortSymbolContext {
public boolean hasFunction = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Context process(CollectNode node) {
if (node.toCollect() != null) {
for (Symbol symbol : node.toCollect()) {
if(node.orderBy() != null && node.limit() != null &&
node.orderBy().contains(symbol) &&
node.orderBy().orderBySymbols().contains(symbol) &&
!node.isSystemSchema()) {
context.addOrderBySymbol(symbol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import io.crate.Constants;
import io.crate.action.sql.query.CrateSearchService;
import io.crate.analyze.OrderBy;
import io.crate.analyze.WhereClause;
import io.crate.breaker.CrateCircuitBreakerService;
import io.crate.breaker.RamAccountingContext;
Expand Down Expand Up @@ -110,10 +111,7 @@ public void required(boolean required) {
private final List<Input<?>> topLevelInputs;
private final List<LuceneCollectorExpression<?>> collectorExpressions;
private final Integer limit;
private final List<Symbol> orderBy;
private final boolean[] reverseFlags;
private final Boolean[] nullsFirst;

private final OrderBy orderBy;

public LuceneDocCollector(ThreadPool threadPool,
ClusterService clusterService,
Expand All @@ -128,14 +126,10 @@ public LuceneDocCollector(ThreadPool threadPool,
Functions functions,
WhereClause whereClause,
Projector downStreamProjector,
Integer limit,
@Nullable List<Symbol> orderBy,
@Nullable boolean[] reverseFlags,
@Nullable Boolean[] nullsFirst) throws Exception {
@Nullable Integer limit,
@Nullable OrderBy orderBy) {
this.limit = limit;
this.orderBy = orderBy;
this.reverseFlags = reverseFlags;
this.nullsFirst = nullsFirst;
downstream(downStreamProjector);
SearchShardTarget searchShardTarget = new SearchShardTarget(
clusterService.localNode().id(), shardId.getIndex(), shardId.id());
Expand Down Expand Up @@ -218,14 +212,15 @@ public void setNextOrderByValues(ScoreDoc scoreDoc, SortField[] sortFields) {
for (LuceneCollectorExpression expr : collectorExpressions) {
if ( expr instanceof PrefetchedValueCollectorExpression) {
Object fieldValue = ((FieldDoc)scoreDoc).fields[i];
Object missingValue = missingValue(reverseFlags[i],
nullsFirst[i],
Object missingValue = missingValue(orderBy.reverseFlags()[i],
orderBy.nullsFirst()[i],
((IndexFieldData.XFieldComparatorSource)sortFields[i].getComparatorSource()).reducedType());
if(fieldValue != null && fieldValue.equals(missingValue)) {
fieldValue = null;
}
Object value = ((PrefetchedValueCollectorExpression) expr).valueType().value(fieldValue);
((PrefetchedValueCollectorExpression) expr).value(value);
i++;
}
}
}
Expand Down Expand Up @@ -255,9 +250,7 @@ public void doCollect(RamAccountingContext ramAccountingContext) throws Exceptio
// do the lucene search
try {
if( orderBy != null && limit != null) {
Sort sort = CrateSearchService.generateLuceneSort(searchContext, orderBy,
reverseFlags, nullsFirst,
inputSymbolVisitor);
Sort sort = CrateSearchService.generateLuceneSort(searchContext, orderBy, inputSymbolVisitor);
TopFieldDocs topFieldDocs = searchContext.searcher().search(query, limit, sort);
IndexReaderContext indexReaderContext = searchContext.searcher().getTopReaderContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ private CrateCollector getLuceneIndexCollector(CollectNode collectNode, Projecto
collectNode.whereClause(),
downstream,
collectNode.limit(),
collectNode.orderBy(),
collectNode.reverseFlags(),
collectNode.nullsFirst());
collectNode.orderBy());
}
}
12 changes: 2 additions & 10 deletions sql/src/main/java/io/crate/planner/PlanNodeBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ public static CollectNode distributingCollect(TableInfo tableInfo,
@Nullable OrderBy orderBy,
@Nullable Integer limit) {
CollectNode node = distributingCollect(tableInfo, whereClause, toCollect, downstreamNodes, projections);
if( orderBy != null) {
node.orderBy(orderBy.orderBySymbols());
node.reverseFlags(orderBy.reverseFlags());
node.nullsFirst(orderBy.nullsFirst());
}
node.orderBy(orderBy);
if (limit != null) {
node.limit(limit);
}
Expand Down Expand Up @@ -143,11 +139,7 @@ public static CollectNode collect(TableInfo tableInfo,
node.isPartitioned(tableInfo.isPartitioned());
node.isSystemSchema(tableInfo.schemaInfo().systemSchema());
setOutputTypes(node);
if( orderBy != null) {
node.orderBy(orderBy.orderBySymbols());
node.reverseFlags(orderBy.reverseFlags());
node.nullsFirst(orderBy.nullsFirst());
}
node.orderBy(orderBy);
if (limit != null) {
node.limit(limit);
}
Expand Down
65 changes: 11 additions & 54 deletions sql/src/main/java/io/crate/planner/node/dql/CollectNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.crate.analyze.EvaluatingNormalizer;
import io.crate.analyze.OrderBy;
import io.crate.analyze.WhereClause;
import io.crate.metadata.Routing;
import io.crate.planner.RowGranularity;
Expand Down Expand Up @@ -56,9 +57,7 @@ public class CollectNode extends AbstractDQLPlanNode {
private boolean isSystemSchema = false;

private Integer limit = null;
private List<Symbol> orderBy;
boolean[] reverseFlags;
private Boolean[] nullsFirst;
private OrderBy orderBy = null;

public CollectNode(String id) {
super(id);
Expand Down Expand Up @@ -105,34 +104,14 @@ public void limit(int limit) {
this.limit = limit;
}

public List<Symbol> orderBy() {
public @Nullable OrderBy orderBy() {
return orderBy;
}

public void orderBy(@Nullable List<Symbol> orderBy) {
public void orderBy(@Nullable OrderBy orderBy) {
this.orderBy = orderBy;
}

public boolean[] reverseFlags() {
return reverseFlags;
}

public void reverseFlags(boolean[] reverseFlags) {
this.reverseFlags = reverseFlags;
}

public boolean isOrdered() {
return reverseFlags != null && reverseFlags.length > 0;
}

public Boolean[] nullsFirst() {
return nullsFirst;
}

public void nullsFirst(Boolean[] nullsFirst) {
this.nullsFirst = nullsFirst;
}

public CollectNode(String id, Routing routing, List<Symbol> toCollect, List<Projection> projections) {
super(id);
this.routing = routing;
Expand Down Expand Up @@ -247,25 +226,11 @@ public void readFrom(StreamInput in) throws IOException {
if( in.readBoolean()) {
limit = in.readVInt();
}
int numOrderBy = in.readVInt();

if (numOrderBy > 0) {
reverseFlags = new boolean[numOrderBy];

for (int i = 0; i < reverseFlags.length; i++) {
reverseFlags[i] = in.readBoolean();
}

orderBy = new ArrayList<>(numOrderBy);
for (int i = 0; i < reverseFlags.length; i++) {
orderBy.add(Symbol.fromStream(in));
}

nullsFirst = new Boolean[numOrderBy];
for (int i = 0; i < numOrderBy; i++) {
nullsFirst[i] = in.readOptionalBoolean();
}
if (in.readBoolean()) {
orderBy = OrderBy.fromStream(in);
}

isSystemSchema = in.readBoolean();
}

Expand Down Expand Up @@ -308,19 +273,11 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (isOrdered()) {
out.writeVInt(reverseFlags.length);
for (boolean reverseFlag : reverseFlags) {
out.writeBoolean(reverseFlag);
}
for (Symbol symbol : orderBy) {
Symbol.toStream(symbol, out);
}
for (Boolean nullFirst : nullsFirst) {
out.writeOptionalBoolean(nullFirst);
}
if (orderBy != null) {
out.writeBoolean(true);
OrderBy.toStream(orderBy, out);
} else {
out.writeVInt(0);
out.writeBoolean(false);
}
out.writeBoolean(isSystemSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

package io.crate.integrationtests;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import io.crate.action.sql.SQLActionException;
import io.crate.action.sql.SQLResponse;
import io.crate.test.integration.CrateIntegrationTest;
Expand Down Expand Up @@ -1067,7 +1066,7 @@ public void groupBySortOnGroupingKeyNonDistributedGroupBy() throws Exception {
}

@Test
public void testGroupByOnGroupingKeyReduceOnCollectorGroupBy() throws Exception {
public void testGroupByReduceOnCollectorGroupBy() throws Exception {
execute("create table foo (id int primary key, name string primary key) with (number_of_replicas = 0)");
ensureGreen();

Expand Down
14 changes: 7 additions & 7 deletions sql/src/test/java/io/crate/planner/PlannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,8 @@ public void testNonDistributedGroupByOrderOnCollectNode() throws Exception {
"select id from users group by id order by id limit 2 offset 2");
CollectNode collectNode = planNode.collectNode();
assertThat(collectNode.limit(), is(4)); // limit + offset
assertThat(collectNode.orderBy().size(), is(1));
assertThat(((Reference)collectNode.orderBy().get(0)).ident().columnIdent().name(), is("id"));
assertThat(collectNode.orderBy().orderBySymbols().size(), is(1));
assertThat(((Reference)collectNode.orderBy().orderBySymbols().get(0)).ident().columnIdent().name(), is("id"));
assertThat(((GroupProjection)collectNode.projections().get(0)).limit(), is(4));
}

Expand All @@ -1337,8 +1337,8 @@ public void testDistributedGroupByOrderOnCollectNode() throws Exception {
"select name from users group by name order by name desc limit 1 offset 3");
CollectNode collectNode = distributedGroupBy.collectNode();
assertThat(collectNode.limit(), is(4));
assertThat(collectNode.orderBy().size(), is(1));
assertThat(((Reference)collectNode.orderBy().get(0)).ident().columnIdent().name(), is("name"));
assertThat(collectNode.orderBy().orderBySymbols().size(), is(1));
assertThat(((Reference)collectNode.orderBy().orderBySymbols().get(0)).ident().columnIdent().name(), is("name"));
assertThat(((GroupProjection)collectNode.projections().get(0)).limit(), is(4));
}

Expand All @@ -1361,9 +1361,9 @@ public void testDistributedGroupbyAutoOrderByGroupKeys() throws Exception {
DistributedGroupBy planNode = (DistributedGroupBy) plan(
"select id, name from users group by id, name limit 2");
CollectNode collectNode = planNode.collectNode();
assertThat(collectNode.orderBy().size(), is(2));
assertThat(((Reference)collectNode.orderBy().get(0)).ident().columnIdent().name(), is("id"));
assertThat(((Reference)collectNode.orderBy().get(1)).ident().columnIdent().name(), is("name"));
assertThat(collectNode.orderBy().orderBySymbols().size(), is(2));
assertThat(((Reference)collectNode.orderBy().orderBySymbols().get(0)).ident().columnIdent().name(), is("id"));
assertThat(((Reference)collectNode.orderBy().orderBySymbols().get(1)).ident().columnIdent().name(), is("name"));
}

@Test
Expand Down

0 comments on commit 51d90af

Please sign in to comment.