From 243405a9a4379499a4506f942214618f90e47355 Mon Sep 17 00:00:00 2001 From: adeneche Date: Tue, 19 Jan 2016 13:33:22 -0800 Subject: [PATCH 1/4] DRILL-4261: Add support for RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING --- .../apache/drill/exec/opt/BasicOptimizer.java | 2 +- .../drill/exec/physical/config/WindowPOP.java | 36 +++++++++++++--- .../impl/window/FrameSupportTemplate.java | 17 ++++++-- .../impl/window/NoFrameSupportTemplate.java | 3 +- .../exec/physical/impl/window/Partition.java | 3 ++ .../impl/window/WindowFrameRecordBatch.java | 8 ++-- .../physical/impl/window/WindowFramer.java | 3 +- .../physical/impl/window/WindowFunction.java | 43 ++++++++++--------- .../exec/planner/physical/WindowPrel.java | 4 +- .../parser/UnsupportedOperatorsVisitor.java | 5 ++- .../physical/impl/window/TestWindowFrame.java | 10 +++++ .../src/test/resources/window/q3.sql | 9 ++++ .../src/test/resources/window/q4.sql | 9 ++++ 13 files changed, 111 insertions(+), 41 deletions(-) create mode 100644 exec/java-exec/src/test/resources/window/q3.sql create mode 100644 exec/java-exec/src/test/resources/window/q4.sql diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 6e70506d229..018daa451ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -158,7 +158,7 @@ public PhysicalOperator visitWindow(final Window window, final Object value) thr input = new Sort(input, ods, false); return new WindowPOP(input, window.getWithins(), window.getAggregations(), - window.getOrderings(), window.getStart(), window.getEnd()); + window.getOrderings(), null, null); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java index 5926a06ed02..46c8d072430 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.calcite.rex.RexWindowBound; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; import org.apache.drill.exec.physical.base.AbstractSingle; @@ -33,15 +34,15 @@ public class WindowPOP extends AbstractSingle { private final NamedExpression[] withins; private final NamedExpression[] aggregations; private final Order.Ordering[] orderings; - private final long start; - private final long end; + private final Bound start; + private final Bound end; public WindowPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("within") NamedExpression[] withins, @JsonProperty("aggregations") NamedExpression[] aggregations, @JsonProperty("orderings") Order.Ordering[] orderings, - @JsonProperty("start") long start, - @JsonProperty("end") long end) { + @JsonProperty("start") Bound start, + @JsonProperty("end") Bound end) { super(child); this.withins = withins; this.aggregations = aggregations; @@ -65,11 +66,11 @@ public int getOperatorType() { return UserBitShared.CoreOperatorType.WINDOW_VALUE; } - public long getStart() { + public Bound getStart() { return start; } - public long getEnd() { + public Bound getEnd() { return end; } @@ -84,4 +85,27 @@ public NamedExpression[] getWithins() { public Order.Ordering[] getOrderings() { return orderings; } + + @JsonTypeName("windowBound") + public static class Bound { + private final boolean unbounded; + private final long offset; + + public Bound(@JsonProperty("unbounded") boolean unbounded, @JsonProperty("offset") long offset) { + this.unbounded = unbounded; + this.offset = offset; + } + + public boolean isUnbounded() { + return unbounded; + } + + public long getOffset() { + return offset; + } + } + + public static Bound newBound(RexWindowBound windowBound) { + return new Bound(windowBound.isUnbounded(), Long.MIN_VALUE); //TODO: Get offset to work + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java index 16c751349fb..d69e0c00fe7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java @@ -20,6 +20,7 @@ import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -52,9 +53,11 @@ public abstract class FrameSupportTemplate implements WindowFramer { private Partition partition; + private boolean unboundedFollowing; // true if the frame is of the form RANGE BETWEEN X AND UNBOUNDED FOLLOWING + @Override public void setup(final List batches, final VectorContainer container, final OperatorContext oContext, - final boolean requireFullPartition) throws SchemaChangeException { + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException { this.container = container; this.batches = batches; @@ -65,6 +68,7 @@ public void setup(final List batches, final VectorContainer con partition = null; this.requireFullPartition = requireFullPartition; + unboundedFollowing = popConfig.getEnd().isUnbounded(); } private void allocateInternal() { @@ -217,6 +221,7 @@ private long aggregatePeers(final int start) throws SchemaChangeException { VectorAccessible last = current; long length = 0; + final long remaining = partition.getLength(); // a single frame can include rows from multiple batches // start processing first batch and, if necessary, move to next batches @@ -226,8 +231,14 @@ private long aggregatePeers(final int start) throws SchemaChangeException { // for every remaining row in the partition, count it if it's a peer row for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) { - if (!isPeer(start, current, row, batch)) { - break; + if (unboundedFollowing) { + if (length >= remaining) { + break; + } + } else { + if (!isPeer(start, current, row, batch)) { + break; + } } evaluatePeer(row); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java index ac1eefc9965..21dfbba91e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java @@ -20,6 +20,7 @@ import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -53,7 +54,7 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { @Override public void setup(final List batches, final VectorContainer container, final OperatorContext oContext, - final boolean requireFullPartition) throws SchemaChangeException { + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException { this.container = container; this.batches = batches; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java index 66cf72060ad..92bff6e0d84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java @@ -45,6 +45,9 @@ public long getRemaining() { return remaining; } + public long getLength() { + return length; + } /** * @param length number of rows in this partition * @param partial if true, then length is not the full length of the partition but just the number of rows in the diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index d6be1eb9bdd..46a6c0ea244 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -212,7 +212,7 @@ private boolean canDoWork() { final boolean frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last); for (final WindowFunction function : functions) { - if (!function.canDoWork(batches.size(), hasOrderBy, frameEndReached, partitionEndReached)) { + if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) { return false; } } @@ -279,7 +279,7 @@ private void createFramers(VectorAccessible batch) throws SchemaChangeException, final WindowFunction winfun = WindowFunction.fromExpression(call); if (winfun.materialize(ne, container, context.getFunctionRegistry())) { functions.add(winfun); - requireFullPartition |= winfun.requiresFullPartition(hasOrderBy); + requireFullPartition |= winfun.requiresFullPartition(popConfig); if (winfun.supportsCustomFrames()) { useCustomFrame = true; @@ -311,13 +311,13 @@ private void createFramers(VectorAccessible batch) throws SchemaChangeException, int index = 0; if (useDefaultFrame) { framers[index] = generateFramer(keyExprs, orderExprs, functions, false); - framers[index].setup(batches, container, oContext, requireFullPartition); + framers[index].setup(batches, container, oContext, requireFullPartition, popConfig); index++; } if (useCustomFrame) { framers[index] = generateFramer(keyExprs, orderExprs, functions, true); - framers[index].setup(batches, container, oContext, requireFullPartition); + framers[index].setup(batches, container, oContext, requireFullPartition, popConfig); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java index 9b985c0cf3d..3d2d0fc3c8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; @@ -32,7 +33,7 @@ public interface WindowFramer { TemplateClassDefinition FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class); void setup(final List batches, final VectorContainer container, final OperatorContext operatorContext, - final boolean requireFullPartition) throws SchemaChangeException; + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException; /** * process the inner batch and write the aggregated values in the container diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java index 1c712972c9a..b267ed6bb7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionLookupContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; @@ -95,23 +96,23 @@ static WindowFunction fromExpression(final FunctionCall call) { abstract boolean supportsCustomFrames(); /** - * @param hasOrderBy window definition contains an ORDER BY clause + * @param pop window group definition * @return true if this window function requires all batches of current partition to be available before processing * the first batch */ - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return true; } /** * @param numBatchesAvailable number of batches available for current partition - * @param hasOrderBy window definition contains an ORDER BY clause + * @param pop window group definition * @param frameEndReached we found the last row of the first batch's frame * @param partitionEndReached all batches of current partition are available * * @return true if this window function can process the first batch immediately */ - public boolean canDoWork(final int numBatchesAvailable, final boolean hasOrderBy, final boolean frameEndReached, + public boolean canDoWork(final int numBatchesAvailable, final WindowPOP pop, final boolean frameEndReached, final boolean partitionEndReached) { return partitionEndReached; } @@ -155,13 +156,13 @@ void generateCode(ClassGenerator cg) { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { - return !hasOrderBy; + public boolean requiresFullPartition(final WindowPOP pop) { + return pop.getOrderings().length == 0 || pop.getEnd().isUnbounded(); } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { - return partitionEndReached || (hasOrderBy && frameEndReached); + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { + return partitionEndReached || (!requiresFullPartition(pop) && frameEndReached); } @Override @@ -212,18 +213,18 @@ boolean materialize(final NamedExpression ne, final VectorContainer batch, Funct } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { // CUME_DIST, PERCENT_RANK and NTILE require the length of current partition before processing it's first batch return type == Type.CUME_DIST || type == Type.PERCENT_RANK || type == Type.NTILE; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; // for CUME_DIST, PERCENT_RANK and NTILE we need the full partition // otherwise we can process the first batch immediately - return partitionEndReached || ! requiresFullPartition(hasOrderBy); + return partitionEndReached || ! requiresFullPartition(pop); } @Override @@ -319,12 +320,12 @@ boolean materialize(final NamedExpression ne, final VectorContainer batch, final } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { return partitionEndReached || numBatchesAvailable > 1; } @@ -389,12 +390,12 @@ void generateCode(ClassGenerator cg) { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; return true; } @@ -449,13 +450,13 @@ void generateCode(ClassGenerator cg) { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { - return !hasOrderBy; + public boolean requiresFullPartition(final WindowPOP pop) { + return pop.getOrderings().length == 0 || pop.getEnd().isUnbounded(); } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { - return partitionEndReached || (hasOrderBy && frameEndReached); + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { + return partitionEndReached || (!requiresFullPartition(pop) && frameEndReached); } @Override @@ -533,12 +534,12 @@ void generateCode(final ClassGenerator cg) { } @Override - public boolean requiresFullPartition(boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; return true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java index c27b547c65c..51d34808974 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java @@ -105,8 +105,8 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws withins.toArray(new NamedExpression[withins.size()]), aggs.toArray(new NamedExpression[aggs.size()]), orderings.toArray(new Order.Ordering[orderings.size()]), - Long.MIN_VALUE, //TODO: Get first/last to work - Long.MIN_VALUE); + WindowPOP.newBound(window.lowerBound), + WindowPOP.newBound(window.upperBound)); creator.addMetadata(this, windowPOP); return windowPOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java index effe022ccb9..0a134cba4f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java @@ -178,13 +178,14 @@ public SqlNode visit(SqlCall sqlCall) { // it is a default frame boolean isSupported = (lowerBound == null && upperBound == null); - // When OVER clause contain an ORDER BY clause the following frames are equivalent to the default frame: + // When OVER clause contain an ORDER BY clause the following frames are supported: // RANGE UNBOUNDED PRECEDING // RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + // RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING if(window.getOrderList().size() != 0 && !window.isRows() && SqlWindow.isUnboundedPreceding(lowerBound) - && (upperBound == null || SqlWindow.isCurrentRow(upperBound))) { + && (upperBound == null || SqlWindow.isCurrentRow(upperBound) || SqlWindow.isUnboundedFollowing(upperBound))) { isSupported = true; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 10abbff8b1d..5e9a94e4da3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -106,6 +106,16 @@ public void testMultipleFramers() throws Exception { ); } + @Test + public void testUnboundedFollowing() throws Exception { + testBuilder() + .sqlQuery(String.format(getFile("window/q3.sql"), TEST_RES_PATH)) + .ordered() + .sqlBaselineQuery(String.format(getFile("window/q4.sql"), TEST_RES_PATH)) + .build() + .run(); + } + /** * 2 batches with 2 partitions (position_id column), each batch contains a different partition */ diff --git a/exec/java-exec/src/test/resources/window/q3.sql b/exec/java-exec/src/test/resources/window/q3.sql new file mode 100644 index 00000000000..0efb137c52a --- /dev/null +++ b/exec/java-exec/src/test/resources/window/q3.sql @@ -0,0 +1,9 @@ +SELECT + position_id, + employee_id, + LAST_VALUE(employee_id) + OVER(PARTITION BY position_id + ORDER by employee_id + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `last_value` +FROM + dfs_test.`%s/window/b4.p4` diff --git a/exec/java-exec/src/test/resources/window/q4.sql b/exec/java-exec/src/test/resources/window/q4.sql new file mode 100644 index 00000000000..5e1fb22f5ff --- /dev/null +++ b/exec/java-exec/src/test/resources/window/q4.sql @@ -0,0 +1,9 @@ +SELECT + position_id, + employee_id, + MAX(employee_id) OVER(PARTITION BY position_id) AS `last_value` +FROM ( + SELECT * + FROM dfs_test.`%s/window/b4.p4` + ORDER BY position_id, employee_id +) From 8b7b35804bfd80868a492dd93fa5420412310613 Mon Sep 17 00:00:00 2001 From: adeneche Date: Mon, 25 Jan 2016 11:53:00 -0800 Subject: [PATCH 2/4] FrameSupportTemplate doesn't need to use Partition class --- .../impl/window/FrameSupportTemplate.java | 61 ++++++++++--------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java index d69e0c00fe7..8b431200813 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java @@ -51,7 +51,9 @@ public abstract class FrameSupportTemplate implements WindowFramer { // true when at least one window function needs to process all batches of a partition before passing any batch downstream private boolean requireFullPartition; - private Partition partition; + private long remainingRows; // num unprocessed rows in current partition + private long remainingPeers; // num unprocessed peer rows in current frame + private boolean partialPartition; // true if we remainingRows only account for the current batch and more batches are expected for the current partition private boolean unboundedFollowing; // true if the frame is of the form RANGE BETWEEN X AND UNBOUNDED FOLLOWING @@ -65,7 +67,6 @@ public void setup(final List batches, final VectorContainer con allocateInternal(); outputCount = 0; - partition = null; this.requireFullPartition = requireFullPartition; unboundedFollowing = popConfig.getEnd().isUnbounded(); @@ -78,6 +79,10 @@ private void allocateInternal() { } } + private boolean isPartitionDone() { + return !partialPartition && remainingRows == 0; + } + /** * processes all rows of the first batch. */ @@ -92,37 +97,36 @@ public void doWork() throws DrillException { outputCount = current.getRecordCount(); while (currentRow < outputCount) { - if (partition != null) { - assert currentRow == 0 : "pending windows are only expected at the start of the batch"; - - // we have a pending window we need to handle from a previous call to doWork() - logger.trace("we have a pending partition {}", partition); + if (!isPartitionDone()) { + // we have a pending partition we need to handle from a previous call to doWork() + assert currentRow == 0 : "pending partitions are only expected at the start of the batch"; + logger.trace("we have a pending partition {}", remainingRows); if (!requireFullPartition) { // we didn't compute the whole partition length in the previous partition, we need to update the length now - updatePartitionSize(partition, currentRow); + updatePartitionSize(currentRow); } } else { newPartition(current, currentRow); } currentRow = processPartition(currentRow); - if (partition.isDone()) { + if (isPartitionDone()) { cleanPartition(); } } } private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException { - partition = new Partition(); - updatePartitionSize(partition, currentRow); + remainingRows = 0; + remainingPeers = 0; + updatePartitionSize(currentRow); setupPartition(current, container); saveFirstValue(currentRow); } private void cleanPartition() { - partition = null; resetValues(); for (VectorWrapper vw : internal) { if ((vw.getValueVector() instanceof BaseDataValueVector)) { @@ -138,14 +142,14 @@ private void cleanPartition() { * @throws DrillException if it can't write into the container */ private int processPartition(final int currentRow) throws DrillException { - logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount); + logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount); setupWriteFirstValue(internal, container); int row = currentRow; // process all rows except the last one of the batch/partition - while (row < outputCount && !partition.isDone()) { + while (row < outputCount && !isPartitionDone()) { processRow(row); row++; @@ -155,27 +159,26 @@ private int processPartition(final int currentRow) throws DrillException { } private void processRow(final int row) throws DrillException { - if (partition.isFrameDone()) { + if (remainingPeers == 0) { // because all peer rows share the same frame, we only need to compute and aggregate the frame once - final long peers = aggregatePeers(row); - partition.newFrame(peers); + remainingPeers = aggregatePeers(row); } - outputRow(row, partition); + outputRow(row); writeLastValue(frameLastRow, row); - partition.rowAggregated(); + remainingRows--; + remainingPeers--; } /** * updates partition's length after computing the number of rows for the current the partition starting at the specified * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch */ - private void updatePartitionSize(final Partition partition, final int start) { + private void updatePartitionSize(final int start) { logger.trace("compute partition size starting from {} on {} batches", start, batches.size()); long length = 0; - boolean lastBatch = false; int row = start; // count all rows that are in the same partition of start @@ -202,12 +205,16 @@ private void updatePartitionSize(final Partition partition, final int start) { if (!requireFullPartition) { // this is the last batch of current partition if - lastBatch = row < outputCount // partition ends before the end of the batch - || batches.size() == 1 // it's the last available batch + boolean lastBatch = row < outputCount // partition ends before the end of the batch + || batches.size() == 1 // it's the last available batch || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition + + partialPartition = !lastBatch; + } else { + partialPartition = false; } - partition.updateLength(length, !(requireFullPartition || lastBatch)); + remainingRows += length; } /** @@ -221,7 +228,6 @@ private long aggregatePeers(final int start) throws SchemaChangeException { VectorAccessible last = current; long length = 0; - final long remaining = partition.getLength(); // a single frame can include rows from multiple batches // start processing first batch and, if necessary, move to next batches @@ -232,7 +238,7 @@ private long aggregatePeers(final int start) throws SchemaChangeException { // for every remaining row in the partition, count it if it's a peer row for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) { if (unboundedFollowing) { - if (length >= remaining) { + if (length >= remainingRows) { break; } } else { @@ -282,9 +288,8 @@ public void cleanup() { * called once for each row after we evaluate all peer rows. Used to write a value in the row * * @param outIndex index of row - * @param partition object used by "computed" window functions */ - public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition); + public abstract void outputRow(@Named("outIndex") int outIndex); /** * Called once per partition, before processing the partition. Used to setup read/write vectors From fa14946ff9d9d63a4285d84dfa5375946ce13623 Mon Sep 17 00:00:00 2001 From: adeneche Date: Mon, 25 Jan 2016 12:18:06 -0800 Subject: [PATCH 3/4] pass "isRows" to WindowPOP --- .../java/org/apache/drill/exec/opt/BasicOptimizer.java | 2 +- .../org/apache/drill/exec/physical/config/WindowPOP.java | 9 ++++++++- .../apache/drill/exec/planner/physical/WindowPrel.java | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 018daa451ba..3f064d4c57a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -158,7 +158,7 @@ public PhysicalOperator visitWindow(final Window window, final Object value) thr input = new Sort(input, ods, false); return new WindowPOP(input, window.getWithins(), window.getAggregations(), - window.getOrderings(), null, null); + window.getOrderings(), false, null, null); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java index 46c8d072430..03d8ea9ad7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -34,6 +34,7 @@ public class WindowPOP extends AbstractSingle { private final NamedExpression[] withins; private final NamedExpression[] aggregations; private final Order.Ordering[] orderings; + private final boolean rows; private final Bound start; private final Bound end; @@ -41,19 +42,21 @@ public WindowPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("within") NamedExpression[] withins, @JsonProperty("aggregations") NamedExpression[] aggregations, @JsonProperty("orderings") Order.Ordering[] orderings, + @JsonProperty("rows") boolean rows, @JsonProperty("start") Bound start, @JsonProperty("end") Bound end) { super(child); this.withins = withins; this.aggregations = aggregations; this.orderings = orderings; + this.rows = rows; this.start = start; this.end = end; } @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new WindowPOP(child, withins, aggregations, orderings, start, end); + return new WindowPOP(child, withins, aggregations, orderings, rows, start, end); } @Override @@ -86,6 +89,10 @@ public Order.Ordering[] getOrderings() { return orderings; } + public boolean isRows() { + return rows; + } + @JsonTypeName("windowBound") public static class Bound { private final boolean unbounded; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java index 51d34808974..1a89bd7387c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java @@ -105,6 +105,7 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws withins.toArray(new NamedExpression[withins.size()]), aggs.toArray(new NamedExpression[aggs.size()]), orderings.toArray(new Order.Ordering[orderings.size()]), + window.isRows, WindowPOP.newBound(window.lowerBound), WindowPOP.newBound(window.upperBound)); From d5b7065e43923e4b25df9659f8a680922f0f9f3d Mon Sep 17 00:00:00 2001 From: adeneche Date: Mon, 25 Jan 2016 12:19:22 -0800 Subject: [PATCH 4/4] DRILL-4262: add support for ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW --- .../impl/window/FrameSupportTemplate.java | 45 ++++++++++++++----- .../parser/UnsupportedOperatorsVisitor.java | 8 ++++ .../java/org/apache/drill/TestBuilder.java | 4 ++ .../drill/exec/TestWindowFunctions.java | 29 ------------ .../physical/impl/window/TestWindowFrame.java | 30 +++++++++++-- .../aggregate_rows_unbounded_current.sql | 4 ++ ...regate_rows_unbounded_current_baseline.sql | 3 ++ .../last_value_rows_unbounded_current.sql | 5 +++ ..._value_rows_unbounded_current_baseline.sql | 5 +++ 9 files changed, 88 insertions(+), 45 deletions(-) create mode 100644 exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql create mode 100644 exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql create mode 100644 exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql create mode 100644 exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java index 8b431200813..e56da597bf6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java @@ -55,6 +55,7 @@ public abstract class FrameSupportTemplate implements WindowFramer { private long remainingPeers; // num unprocessed peer rows in current frame private boolean partialPartition; // true if we remainingRows only account for the current batch and more batches are expected for the current partition + private boolean isRows; // true if ROWS frame private boolean unboundedFollowing; // true if the frame is of the form RANGE BETWEEN X AND UNBOUNDED FOLLOWING @Override @@ -68,6 +69,7 @@ public void setup(final List batches, final VectorContainer con outputCount = 0; + this.isRows = popConfig.isRows(); this.requireFullPartition = requireFullPartition; unboundedFollowing = popConfig.getEnd().isUnbounded(); } @@ -146,29 +148,48 @@ private int processPartition(final int currentRow) throws DrillException { setupWriteFirstValue(internal, container); - int row = currentRow; + if (isRows) { + return processROWS(currentRow); + } else { + return processRANGE(currentRow); + } + } + + private int processROWS(int row) throws DrillException { + //TODO we only need to call these once per batch + setupEvaluatePeer(current, container); + setupReadLastValue(current, container); - // process all rows except the last one of the batch/partition while (row < outputCount && !isPartitionDone()) { - processRow(row); + logger.trace("aggregating row {}", row); + evaluatePeer(row); + + outputRow(row); + writeLastValue(row, row); + remainingRows--; row++; } return row; } - private void processRow(final int row) throws DrillException { - if (remainingPeers == 0) { - // because all peer rows share the same frame, we only need to compute and aggregate the frame once - remainingPeers = aggregatePeers(row); - } + private int processRANGE(int row) throws DrillException { + while (row < outputCount && !isPartitionDone()) { + if (remainingPeers == 0) { + // because all peer rows share the same frame, we only need to compute and aggregate the frame once + remainingPeers = aggregatePeers(row); + } + + outputRow(row); + writeLastValue(frameLastRow, row); - outputRow(row); - writeLastValue(frameLastRow, row); + remainingRows--; + remainingPeers--; + row++; + } - remainingRows--; - remainingPeers--; + return row; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java index 0a134cba4f9..3a02271cbb6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java @@ -189,6 +189,14 @@ public SqlNode visit(SqlCall sqlCall) { isSupported = true; } + // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + // is supported with and without the ORDER BY clause + if (window.isRows() + && SqlWindow.isUnboundedPreceding(lowerBound) + && (upperBound == null || SqlWindow.isCurrentRow(upperBound))) { + isSupported = true; + } + // When OVER clause doesn't contain an ORDER BY clause, the following are equivalent to the default frame: // RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java index 330ff127c03..25558042c6b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java @@ -346,6 +346,10 @@ public BaselineQueryTestBuilder sqlBaselineQuery(String baselineQuery) { baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches); } + public BaselineQueryTestBuilder sqlBaselineQuery(String query, String ...replacements) { + return sqlBaselineQuery(String.format(query, replacements)); + } + // provide a path to a file containing a SQL query to use as a baseline public BaselineQueryTestBuilder sqlBaselineQueryFromFile(String baselineQueryFilename) throws IOException { String baselineQuery = BaseTestQuery.getFile(baselineQueryFilename); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java index d6cd3c7219b..8055774f02d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java @@ -154,35 +154,6 @@ public void testWindowFrame() throws Exception { } } - @Test(expected = UnsupportedFunctionException.class) // DRILL-3188 - public void testRowsUnboundedPreceding() throws Exception { - try { - final String query = "explain plan for select sum(n_nationKey) over(partition by n_nationKey order by n_nationKey \n" + - "rows UNBOUNDED PRECEDING)" + - "from cp.`tpch/nation.parquet` t \n" + - "order by n_nationKey"; - - test(query); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - throw ex; - } - } - - @Test(expected = UnsupportedFunctionException.class) // DRILL-3359 - public void testFramesDefinedInWindowClause() throws Exception { - try { - final String query = "explain plan for select sum(n_nationKey) over w \n" + - "from cp.`tpch/nation.parquet` \n" + - "window w as (partition by n_nationKey order by n_nationKey rows UNBOUNDED PRECEDING)"; - - test(query); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - throw ex; - } - } - @Test(expected = UnsupportedFunctionException.class) // DRILL-3326 public void testWindowWithAlias() throws Exception { try { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 5e9a94e4da3..634b342a063 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -48,7 +48,7 @@ public static void setupMSortBatchSize() { private DrillTestWrapper buildWindowQuery(final String tableName, final boolean withPartitionBy, final int numBatches) throws Exception { return testBuilder() - .sqlQuery(String.format(getFile("window/q1.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id)":"()")) + .sqlQuery(getFile("window/q1.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id)":"()") .ordered() .csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".tsv") .baselineColumns("count", "sum") @@ -59,7 +59,7 @@ private DrillTestWrapper buildWindowQuery(final String tableName, final boolean private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName, final boolean withPartitionBy, final int numBatches) throws Exception { return testBuilder() - .sqlQuery(String.format(getFile("window/q2.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id order by sub)" : "(order by sub)")) + .sqlQuery(getFile("window/q2.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id order by sub)" : "(order by sub)") .ordered() .csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".oby.tsv") .baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank") @@ -109,9 +109,31 @@ public void testMultipleFramers() throws Exception { @Test public void testUnboundedFollowing() throws Exception { testBuilder() - .sqlQuery(String.format(getFile("window/q3.sql"), TEST_RES_PATH)) + .sqlQuery(getFile("window/q3.sql"), TEST_RES_PATH) .ordered() - .sqlBaselineQuery(String.format(getFile("window/q4.sql"), TEST_RES_PATH)) + .sqlBaselineQuery(getFile("window/q4.sql"), TEST_RES_PATH) + .build() + .run(); + } + + @Test + public void testAggregateRowsUnboundedAndCurrentRow() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/aggregate_rows_unbounded_current.sql"), table) + .ordered() + .sqlBaselineQuery(getFile("window/aggregate_rows_unbounded_current_baseline.sql"), table) + .build() + .run(); + } + + @Test + public void testLastValueRowsUnboundedAndCurrentRow() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/last_value_rows_unbounded_current.sql"), table) + .unOrdered() + .sqlBaselineQuery(getFile("window/last_value_rows_unbounded_current_baseline.sql"), table) .build() .run(); } diff --git a/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql new file mode 100644 index 00000000000..6b163d85281 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql @@ -0,0 +1,4 @@ +SELECT + COUNT(*) OVER(PARTITION BY position_id ORDER BY sub ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `count` +FROM + %s \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql new file mode 100644 index 00000000000..2d13c006093 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql @@ -0,0 +1,3 @@ +SELECT + COUNT(*) OVER(PARTITION BY position_id ORDER BY sub, employee_id) AS `count` +FROM %s \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql new file mode 100644 index 00000000000..5d7eedd4d20 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql @@ -0,0 +1,5 @@ +SELECT + employee_id, + LAST_VALUE(employee_id) OVER(PARTITION BY position_id ORDER BY sub ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `last_value` +FROM + %s \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql new file mode 100644 index 00000000000..432f35230a8 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql @@ -0,0 +1,5 @@ +SELECT + employee_id, + employee_id AS `last_value` +FROM + %s \ No newline at end of file