Skip to content

Commit

Permalink
support functions in select-list on queries that do pk lookups
Browse files Browse the repository at this point in the history
Also made the ESGetTask standalone so it doesn't require an
additional MergeTask.

In addition the plan for insert from query has changed so
that it will always use QueryAndFetch to avoid handler
round-trips.
  • Loading branch information
mfussenegger committed Oct 27, 2014
1 parent 3bbc7fe commit 33acebe
Show file tree
Hide file tree
Showing 9 changed files with 515 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.crate.operation.ImplementationSymbolVisitor;
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
import io.crate.planner.Plan;
import io.crate.planner.RowGranularity;
import io.crate.planner.node.PlanNode;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class TransportExecutor implements Executor {

// operation for handler side collecting
private final HandlerSideDataCollectOperation handlerSideDataCollectOperation;
private final ProjectionToProjectorVisitor projectorVisitor;

@Inject
public TransportExecutor(Settings settings,
Expand All @@ -92,6 +94,10 @@ public TransportExecutor(Settings settings,
this.statsTables = statsTables;
this.clusterService = clusterService;
this.visitor = new Visitor();
ImplementationSymbolVisitor clusterImplementationSymbolVisitor =
new ImplementationSymbolVisitor(referenceResolver, functions, RowGranularity.CLUSTER);
projectorVisitor = new ProjectionToProjectorVisitor(
clusterService, settings, transportActionProvider, clusterImplementationSymbolVisitor);
}

@Override
Expand Down Expand Up @@ -173,6 +179,8 @@ public Void visitESSearchNode(QueryThenFetchNode node, Job context) {
@Override
public Void visitESGetNode(ESGetNode node, Job context) {
context.addTask(new ESGetTask(
functions,
projectorVisitor,
transportActionProvider.transportMultiGetAction(),
transportActionProvider.transportGetAction(),
node));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@
*/
public class FlatProjectorChain {

private final ProjectionToProjectorVisitor projectorVisitor;
private Projector firstProjector;
private final List<Projector> projectors;
private ResultProvider lastProjector;

public FlatProjectorChain(List<Projection> projections, ProjectionToProjectorVisitor projectorVisitor) {
projectors = new ArrayList<>();
this.projectorVisitor = projectorVisitor;
if (projections.size() == 0) {
firstProjector = new CollectingProjector();
lastProjector = (ResultProvider)firstProjector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public interface Projector extends ProjectorUpstream {
*
* This method must be thread safe.
*
* @param row
* @return false if this projection does not need any more rows, true otherwise.
*/
public boolean setNextRow(Object ... row);
Expand Down
42 changes: 12 additions & 30 deletions sql/src/main/java/io/crate/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ protected Plan visitSelectAnalysis(SelectAnalysis analysis, Context context) {
globalAggregates(analysis, plan, context);
} else {
WhereClause whereClause = analysis.whereClause();
if (analysis.rowGranularity().ordinal() >= RowGranularity.DOC.ordinal() &&
if (!context.indexWriterProjection.isPresent()
&& analysis.rowGranularity().ordinal() >= RowGranularity.DOC.ordinal() &&
analysis.table().getRouting(whereClause).hasLocations() &&
(analysis.table().ident().schema() == null || analysis.table().ident().schema().equals(DocSchemaInfo.NAME))
&& (analysis.isLimited() || analysis.ids().size() > 0 || !context.indexWriterProjection.isPresent())) {
analysis.table().schemaInfo().name().equals(DocSchemaInfo.NAME)) {

if (analysis.ids().size() > 0
&& analysis.routingValues().size() > 0
Expand Down Expand Up @@ -479,45 +479,27 @@ private void ESDeleteByQuery(DeleteAnalysis.NestedDeleteAnalysis analysis, Plan
}

private void ESGet(SelectAnalysis analysis, Plan plan, Context context) {
PlannerContextBuilder contextBuilder = new PlannerContextBuilder()
.output(analysis.outputSymbols())
.orderBy(analysis.sortSymbols());

assert !context.indexWriterProjection.isPresent() : "shouldn't use ESGet with indexWriterProjection";
String indexName;
if (analysis.table().isPartitioned()) {
assert analysis.whereClause().partitions().size() == 1 : "ambiguous partitions for ESGet";
indexName = analysis.whereClause().partitions().get(0);
} else {
indexName = analysis.table().ident().name();
}

ESGetNode getNode = new ESGetNode(
indexName,
analysis.outputSymbols(),
extractDataTypes(analysis.outputSymbols()),
analysis.ids(),
analysis.routingValues(),
analysis.sortSymbols(),
analysis.reverseFlags(),
analysis.nullsFirst(),
analysis.limit(),
analysis.offset(),
analysis.table().partitionedByColumns());
getNode.outputs(contextBuilder.toCollect());
getNode.outputTypes(extractDataTypes(analysis.outputSymbols()));
plan.add(getNode);

// handle sorting, limit and offset
if (analysis.isSorted() || analysis.limit() != null
|| analysis.offset() > 0
|| context.indexWriterProjection.isPresent()) {
TopNProjection tnp = new TopNProjection(
Objects.firstNonNull(analysis.limit(), Constants.DEFAULT_SELECT_LIMIT),
analysis.offset(),
contextBuilder.orderBy(),
analysis.reverseFlags(),
analysis.nullsFirst()
);
tnp.outputs(contextBuilder.outputs());
ImmutableList.Builder<Projection> projectionBuilder = ImmutableList.<Projection>builder().add(tnp);
if (context.indexWriterProjection.isPresent()) {
projectionBuilder.add(context.indexWriterProjection.get());
}
plan.add(PlanNodeBuilder.localMerge(projectionBuilder.build(), getNode));
}
}

private void normalSelect(SelectAnalysis analysis, Plan plan, Context context) {
Expand Down Expand Up @@ -1091,7 +1073,7 @@ private void ESIndex(InsertFromValuesAnalysis analysis, Plan plan) {
}


static List<DataType> extractDataTypes(List<Symbol> symbols) {
public static List<DataType> extractDataTypes(List<Symbol> symbols) {
List<DataType> types = new ArrayList<>(symbols.size());
for (Symbol symbol : symbols) {
types.add(DataTypeVisitor.fromSymbol(symbol));
Expand Down
60 changes: 47 additions & 13 deletions sql/src/main/java/io/crate/planner/node/dql/ESGetNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import com.google.common.collect.ImmutableList;
import io.crate.metadata.ReferenceInfo;
import io.crate.planner.node.PlanVisitor;
import io.crate.planner.symbol.Symbol;
import io.crate.types.DataType;
import org.elasticsearch.common.Nullable;

import java.util.Arrays;
import java.util.List;


Expand All @@ -36,27 +37,38 @@ public class ESGetNode extends ESDQLPlanNode implements DQLPlanNode {
private final String index;
private final List<String> ids;
private final List<String> routingValues;
private final List<Symbol> sortSymbols;
private final boolean[] reverseFlags;
private final Boolean[] nullsFirst;
private final Integer limit;
private final int offset;
private final List<ReferenceInfo> partitionBy;

private final static boolean[] EMPTY_REVERSE_FLAGS = new boolean[0];
private final static Boolean[] EMPTY_NULLS_FIRST = new Boolean[0];

public ESGetNode(String index,
List<Symbol> outputs,
List<DataType> outputTypes,
List<String> ids,
List<String> routingValues,
@Nullable List<Symbol> sortSymbols,
@Nullable boolean[] reverseFlags,
@Nullable Boolean[] nullsFirst,
@Nullable Integer limit,
int offset,
@Nullable List<ReferenceInfo> partitionBy) {
this.index = index;
this.outputs = outputs;
outputTypes(outputTypes);
this.ids = ids;
this.routingValues = routingValues;
this.partitionBy = Objects.firstNonNull(partitionBy,
ImmutableList.<ReferenceInfo>of());
}

public ESGetNode(String index,
List<String> ids,
List<String> routingValues) {
this(index, ids, routingValues, null);
}

public ESGetNode(String index, String id, String routingValue) {
this(index, Arrays.asList(id), Arrays.asList(routingValue), null);
this.sortSymbols = Objects.firstNonNull(sortSymbols, ImmutableList.<Symbol>of());
this.reverseFlags = Objects.firstNonNull(reverseFlags, EMPTY_REVERSE_FLAGS);
this.nullsFirst = Objects.firstNonNull(nullsFirst, EMPTY_NULLS_FIRST);
this.limit = limit;
this.offset = offset;
this.partitionBy = Objects.firstNonNull(partitionBy, ImmutableList.<ReferenceInfo>of());
}

public String index() {
Expand All @@ -76,6 +88,28 @@ public List<String> routingValues() {
return routingValues;
}

@Nullable
public Integer limit() {
return limit;
}

public int offset() {
return offset;
}

public List<Symbol> sortSymbols() {
return sortSymbols;
}

public boolean[] reverseFlags() {
return reverseFlags;
}

public Boolean[] nullsFirst() {
return nullsFirst;
}


public List<ReferenceInfo> partitionBy() {
return partitionBy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.crate.operation.projectors.TopN;
import io.crate.operation.scalar.DateTruncFunction;
import io.crate.planner.Plan;
import io.crate.planner.Planner;
import io.crate.planner.RowGranularity;
import io.crate.planner.node.dml.ESDeleteByQueryNode;
import io.crate.planner.node.dml.ESDeleteNode;
Expand Down Expand Up @@ -105,6 +106,26 @@ public class TransportExecutorTest extends SQLTransportIntegrationTest {
Reference parted_date_ref = new Reference(new ReferenceInfo(
new ReferenceIdent(partedTable, "date"), RowGranularity.DOC, DataTypes.TIMESTAMP));

private static ESGetNode newGetNode(String index, List<Symbol> outputs, String id) {
return newGetNode(index, outputs, Arrays.asList(id));
}

private static ESGetNode newGetNode(String index, List<Symbol> outputs, List<String> ids) {
return new ESGetNode(
index,
outputs,
Planner.extractDataTypes(outputs),
ids,
ids,
ImmutableList.<Symbol>of(),
new boolean[0],
new Boolean[0],
null,
0,
null
);
}

@Before
public void transportSetUp() {
CrateTestCluster cluster = cluster();
Expand Down Expand Up @@ -196,8 +217,8 @@ public void testMapSideCollectTask() throws Exception {
public void testESGetTask() throws Exception {
insertCharacters();

ESGetNode node = new ESGetNode("characters", "2", "2");
node.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode node = newGetNode("characters", outputs, "2");
Plan plan = new Plan();
plan.add(node);
Job job = executor.newJob(plan);
Expand All @@ -213,9 +234,9 @@ public void testESGetTask() throws Exception {
public void testESGetTaskWithDynamicReference() throws Exception {
insertCharacters();

ESGetNode node = new ESGetNode("characters", "2", "2");
node.outputs(ImmutableList.<Symbol>of(id_ref, new DynamicReference(
new ReferenceIdent(new TableIdent(null, "characters"), "foo"), RowGranularity.DOC)));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, new DynamicReference(
new ReferenceIdent(new TableIdent(null, "characters"), "foo"), RowGranularity.DOC));
ESGetNode node = newGetNode("characters", outputs, "2");
Plan plan = new Plan();
plan.add(node);
Job job = executor.newJob(plan);
Expand All @@ -230,8 +251,8 @@ public void testESGetTaskWithDynamicReference() throws Exception {
@Test
public void testESMultiGet() throws Exception {
insertCharacters();
ESGetNode node = new ESGetNode("characters", asList("1", "2"), asList("1", "2"));
node.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode node = newGetNode("characters", outputs, asList("1", "2"));
Plan plan = new Plan();
plan.add(node);
Job job = executor.newJob(plan);
Expand Down Expand Up @@ -472,8 +493,8 @@ public void testESDeleteTask() throws Exception {
assertThat(taskResult.rowCount(), is(1L));

// verify deletion
ESGetNode getNode = new ESGetNode("characters", "2", "2");
getNode.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode getNode = newGetNode("characters", outputs, "2");
plan = new Plan();
plan.add(getNode);
job = executor.newJob(plan);
Expand Down Expand Up @@ -514,8 +535,8 @@ public void testESIndexTask() throws Exception {


// verify insertion
ESGetNode getNode = new ESGetNode("characters", "99", "99");
getNode.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode getNode = newGetNode("characters", outputs, "99");
plan = new Plan();
plan.add(getNode);
job = executor.newJob(plan);
Expand Down Expand Up @@ -633,10 +654,8 @@ public void testESBulkInsertTask() throws Exception {

// verify insertion

ESGetNode getNode = new ESGetNode("characters",
Arrays.asList("99", "42"),
Arrays.asList("99", "42"));
getNode.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode getNode = newGetNode("characters", outputs, Arrays.asList("99", "42"));
plan = new Plan();
plan.add(getNode);
job = executor.newJob(plan);
Expand Down Expand Up @@ -681,8 +700,8 @@ public void testESUpdateByIdTask() throws Exception {
assertThat(taskResult.rowCount(), is(1L));

// verify update
ESGetNode getNode = new ESGetNode("characters", Arrays.asList("1"), Arrays.asList("1"));
getNode.outputs(ImmutableList.<Symbol>of(id_ref, name_ref));
ImmutableList<Symbol> outputs = ImmutableList.<Symbol>of(id_ref, name_ref);
ESGetNode getNode = newGetNode("characters", outputs, "1");
plan = new Plan();
plan.add(getNode);
job = executor.newJob(plan);
Expand Down Expand Up @@ -734,8 +753,8 @@ public void testUpdateByQueryTaskWithVersion() throws Exception {
assertThat(result.get(0).get().errorMessage(), is(nullValue()));
assertThat(result.get(0).get().rowCount(), is(1L));

ESGetNode getNode = new ESGetNode("characters", "1", "1");
getNode.outputs(Arrays.<Symbol>asList(id_ref, name_ref, version_ref));
List<Symbol> outputs = Arrays.<Symbol>asList(id_ref, name_ref, version_ref);
ESGetNode getNode = newGetNode("characters", outputs, "1");
plan = new Plan();
plan.add(getNode);
plan.expectsAffectedRows(false);
Expand Down

0 comments on commit 33acebe

Please sign in to comment.