From 8bcde155d23550d6733f4dcb18868bdc3fb3b119 Mon Sep 17 00:00:00 2001
From: adeneche
Date: Mon, 11 Jan 2016 14:47:32 -0800
Subject: [PATCH 1/2] DRILL-4270: Create a separate WindowFramer that supports
the FRAME clause
separate DefaultFrameTemplate into 2 implementations: one that supports custom frames (aggregations, first_value, last_value) and one that doesn't
---
.../impl/window/CustomFrameTemplate.java | 315 ++++++++++++++++++
.../impl/window/DefaultFrameTemplate.java | 111 +-----
.../impl/window/WindowFrameRecordBatch.java | 153 +++++----
.../physical/impl/window/WindowFramer.java | 3 +-
.../physical/impl/window/WindowFunction.java | 53 ++-
.../physical/impl/window/TestWindowFrame.java | 8 +
6 files changed, 475 insertions(+), 168 deletions(-)
create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java
new file mode 100644
index 00000000000..17a1d21471d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import javax.inject.Named;
+import java.util.List;
+
+
+/**
+ * WindowFramer implementation that supports the FRAME clause. Can be used with FIRST_VALUE, LAST_VALUE and
+ * all aggregate functions
+ */
+public abstract class CustomFrameTemplate implements WindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
+
+ private VectorContainer container;
+ private VectorContainer internal;
+ private List batches;
+ private int outputCount; // number of rows in currently/last processed batch
+
+ private WindowDataBatch current;
+
+ private int frameLastRow;
+
+ // true when at least one window function needs to process all batches of a partition before passing any batch downstream
+ private boolean requireFullPartition;
+
+ private Partition partition;
+
+ @Override
+ public void setup(final List batches, final VectorContainer container, final OperatorContext oContext,
+ final boolean requireFullPartition) throws SchemaChangeException {
+ this.container = container;
+ this.batches = batches;
+
+ internal = new VectorContainer(oContext);
+ allocateInternal();
+
+ outputCount = 0;
+ partition = null;
+
+ this.requireFullPartition = requireFullPartition;
+ }
+
+ private void allocateInternal() {
+ for (VectorWrapper> w : container) {
+ ValueVector vv = internal.addOrGet(w.getField());
+ vv.allocateNew();
+ }
+ }
+
+ /**
+ * processes all rows of the first batch.
+ */
+ @Override
+ public void doWork() throws DrillException {
+ int currentRow = 0;
+
+ this.current = batches.get(0);
+
+ setupSaveFirstValue(current, internal);
+
+ outputCount = current.getRecordCount();
+
+ while (currentRow < outputCount) {
+ if (partition != null) {
+ assert currentRow == 0 : "pending windows are only expected at the start of the batch";
+
+ // we have a pending window we need to handle from a previous call to doWork()
+ logger.trace("we have a pending partition {}", partition);
+
+ if (!requireFullPartition) {
+ // we didn't compute the whole partition length in the previous partition, we need to update the length now
+ updatePartitionSize(partition, currentRow);
+ }
+ } else {
+ newPartition(current, currentRow);
+ }
+
+ currentRow = processPartition(currentRow);
+ if (partition.isDone()) {
+ cleanPartition();
+ }
+ }
+ }
+
+ private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+ partition = new Partition();
+ updatePartitionSize(partition, currentRow);
+
+ setupPartition(current, container);
+ saveFirstValue(currentRow);
+ }
+
+ private void cleanPartition() {
+ partition = null;
+ resetValues();
+ for (VectorWrapper> vw : internal) {
+ if ((vw.getValueVector() instanceof BaseDataValueVector)) {
+ ((BaseDataValueVector) vw.getValueVector()).reset();
+ }
+ }
+ }
+
+ /**
+ * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
+ * @param currentRow first unprocessed row
+ * @return index of next unprocessed row
+ * @throws DrillException if it can't write into the container
+ */
+ private int processPartition(final int currentRow) throws DrillException {
+ logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
+
+ setupWriteFirstValue(internal, container);
+
+ int row = currentRow;
+
+ // process all rows except the last one of the batch/partition
+ while (row < outputCount && !partition.isDone()) {
+ processRow(row);
+
+ row++;
+ }
+
+ return row;
+ }
+
+ private void processRow(final int row) throws DrillException {
+ if (partition.isFrameDone()) {
+ // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+ final long peers = aggregatePeers(row);
+ partition.newFrame(peers);
+ }
+
+ outputRow(row, partition);
+ writeLastValue(frameLastRow, row);
+
+ partition.rowAggregated();
+ }
+
+ /**
+ * updates partition's length after computing the number of rows for the current the partition starting at the specified
+ * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
+ */
+ private void updatePartitionSize(final Partition partition, final int start) {
+ logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
+
+ long length = 0;
+ boolean lastBatch = false;
+ int row = start;
+
+ // count all rows that are in the same partition of start
+ // keep increasing length until we find first row of next partition or we reach the very last batch
+
+ outer:
+ for (WindowDataBatch batch : batches) {
+ final int recordCount = batch.getRecordCount();
+
+ // check first container from start row, and subsequent containers from first row
+ for (; row < recordCount; row++, length++) {
+ if (!isSamePartition(start, current, row, batch)) {
+ break outer;
+ }
+ }
+
+ if (!requireFullPartition) {
+ // we are only interested in the first batch's records
+ break;
+ }
+
+ row = 0;
+ }
+
+ if (!requireFullPartition) {
+ // this is the last batch of current partition if
+ lastBatch = row < outputCount // partition ends before the end of the batch
+ || batches.size() == 1 // it's the last available batch
+ || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition
+ }
+
+ partition.updateLength(length, !(requireFullPartition || lastBatch));
+ }
+
+ /**
+ * aggregates all peer rows of current row
+ * @param start starting row of the current frame
+ * @return num peer rows for current row
+ * @throws SchemaChangeException
+ */
+ private long aggregatePeers(final int start) throws SchemaChangeException {
+ logger.trace("aggregating rows starting from {}", start);
+
+ VectorAccessible last = current;
+ long length = 0;
+
+ // a single frame can include rows from multiple batches
+ // start processing first batch and, if necessary, move to next batches
+ for (WindowDataBatch batch : batches) {
+ setupEvaluatePeer(batch, container);
+ final int recordCount = batch.getRecordCount();
+
+ // for every remaining row in the partition, count it if it's a peer row
+ for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
+ if (!isPeer(start, current, row, batch)) {
+ break;
+ }
+
+ evaluatePeer(row);
+ last = batch;
+ frameLastRow = row;
+ }
+ }
+
+ setupReadLastValue(last, container);
+
+ return length;
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputCount;
+ }
+
+ // we need this abstract method for code generation
+ @Override
+ public void cleanup() {
+ logger.trace("clearing internal");
+ internal.clear();
+ }
+
+ /**
+ * called once for each peer row of the current frame.
+ * @param index of row to aggregate
+ */
+ public abstract void evaluatePeer(@Named("index") int index);
+ public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+ public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+ public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
+
+ public abstract void setupSaveFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+ public abstract void saveFirstValue(@Named("index") int index);
+ public abstract void setupWriteFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+ /**
+ * called once for each row after we evaluate all peer rows. Used to write a value in the row
+ *
+ * @param outIndex index of row
+ * @param partition object used by "computed" window functions
+ */
+ public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+
+ /**
+ * Called once per partition, before processing the partition. Used to setup read/write vectors
+ * @param incoming batch we will read from
+ * @param outgoing batch we will be writing to
+ *
+ * @throws SchemaChangeException
+ */
+ public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
+ @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+ /**
+ * reset all window functions
+ */
+ public abstract boolean resetValues();
+
+ /**
+ * compares two rows from different batches (can be the same), if they have the same value for the partition by
+ * expression
+ * @param b1Index index of first row
+ * @param b1 batch for first row
+ * @param b2Index index of second row
+ * @param b2 batch for second row
+ * @return true if the rows are in the same partition
+ */
+ public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+ @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+
+ /**
+ * compares two rows from different batches (can be the same), if they have the same value for the order by
+ * expression
+ * @param b1Index index of first row
+ * @param b1 batch for first row
+ * @param b2Index index of second row
+ * @param b2 batch for second row
+ * @return true if the rows are in the same partition
+ */
+ public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+ @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
index d2ee9f1be77..fae15ba7a70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
@@ -20,7 +20,6 @@
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -31,6 +30,10 @@
import java.util.List;
+/**
+ * WindowFramer implementation that only supports the default frame. Can be used with LEAD, LAG, ROW_NUMBER, and
+ * all ranking functions
+ */
public abstract class DefaultFrameTemplate implements WindowFramer {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
@@ -40,16 +43,12 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
private List batches;
private int outputCount; // number of rows in currently/last processed batch
- private int frameLastRow;
+ private WindowDataBatch current;
// true when at least one window function needs to process all batches of a partition before passing any batch downstream
private boolean requireFullPartition;
- /**
- * current partition being processed.
- * Can span over multiple batches, so we may need to keep it between calls to doWork()
- */
- private Partition partition;
+ private Partition partition; // current partition being processed
@Override
public void setup(final List batches, final VectorContainer container, final OperatorContext oContext,
@@ -67,14 +66,7 @@ public void setup(final List batches, final VectorContainer con
this.requireFullPartition = requireFullPartition;
}
- private void allocateOutgoing() {
- for (VectorWrapper> w : container) {
- w.getValueVector().allocateNew();
- }
- }
-
private void allocateInternal() {
- // TODO we don't need to allocate all container's vectors, we can pass a specific list of vectors to allocate internally
for (VectorWrapper> w : container) {
ValueVector vv = internal.addOrGet(w.getField());
vv.allocateNew();
@@ -82,27 +74,14 @@ private void allocateInternal() {
}
/**
- * processes all rows of current batch:
- *
- * - compute aggregations
- * - compute window functions
- * - transfer remaining vectors from current batch to container
- *
+ * processes all rows of the first batch.
*/
@Override
public void doWork() throws DrillException {
int currentRow = 0;
- logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows",
- batches.size(), batches.get(0).getRecordCount());
+ this.current = batches.get(0);
- allocateOutgoing();
-
- final WindowDataBatch current = batches.get(0);
-
- setupCopyFirstValue(current, internal);
-
- // we need to store the record count explicitly, because we release current batch at the end of this call
outputCount = current.getRecordCount();
while (currentRow < outputCount) {
@@ -125,23 +104,6 @@ public void doWork() throws DrillException {
cleanPartition();
}
}
-
- // transfer "non aggregated" vectors
- for (VectorWrapper> vw : current) {
- ValueVector v = container.addOrGet(vw.getField());
- TransferPair tp = vw.getValueVector().makeTransferPair(v);
- tp.transfer();
- }
-
- for (VectorWrapper> v : container) {
- v.getValueVector().getMutator().setValueCount(outputCount);
- }
-
- // because we are using the default frame, and we keep the aggregated value until we start a new frame
- // we can safely free the current batch
- batches.remove(0).clear();
-
- logger.trace("WindowFramer.doWork() END");
}
private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
@@ -149,7 +111,6 @@ private void newPartition(final WindowDataBatch current, final int currentRow) t
updatePartitionSize(partition, currentRow);
setupPartition(current, container);
- copyFirstValueToInternal(currentRow);
}
private void cleanPartition() {
@@ -164,7 +125,7 @@ private void cleanPartition() {
}
/**
- * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
+ * process all rows (computes and writes function values) of current batch that are part of current partition.
* @param currentRow first unprocessed row
* @return index of next unprocessed row
* @throws DrillException if it can't write into the container
@@ -172,9 +133,7 @@ private void cleanPartition() {
private int processPartition(final int currentRow) throws DrillException {
logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
- final VectorAccessible current = getCurrent();
setupCopyNext(current, container);
- setupPasteValues(internal, container);
copyPrevFromInternal();
@@ -228,12 +187,11 @@ private void copyPrevFromInternal() {
private void processRow(final int row) throws DrillException {
if (partition.isFrameDone()) {
// because all peer rows share the same frame, we only need to compute and aggregate the frame once
- final long peers = aggregatePeers(row);
+ final long peers = countPeers(row);
partition.newFrame(peers);
}
outputRow(row, partition);
- writeLastValue(frameLastRow, row);
partition.rowAggregated();
}
@@ -245,9 +203,6 @@ private void processRow(final int row) throws DrillException {
private void updatePartitionSize(final Partition partition, final int start) {
logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
- // current partition always starts from first batch
- final VectorAccessible first = getCurrent();
-
long length = 0;
boolean lastBatch = false;
int row = start;
@@ -261,7 +216,7 @@ private void updatePartitionSize(final Partition partition, final int start) {
// check first container from start row, and subsequent containers from first row
for (; row < recordCount; row++, length++) {
- if (!isSamePartition(start, first, row, batch)) {
+ if (!isSamePartition(start, current, row, batch)) {
break outer;
}
}
@@ -278,56 +233,37 @@ private void updatePartitionSize(final Partition partition, final int start) {
// this is the last batch of current partition if
lastBatch = row < outputCount // partition ends before the end of the batch
|| batches.size() == 1 // it's the last available batch
- || !isSamePartition(start, first, 0, batches.get(1)); // next batch contains a different partition
+ || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition
}
partition.updateLength(length, !(requireFullPartition || lastBatch));
}
/**
- * aggregates all peer rows of current row
+ * count number of peer rows for current row
* @param start starting row of the current frame
* @return num peer rows for current row
* @throws SchemaChangeException
*/
- private long aggregatePeers(final int start) throws SchemaChangeException {
- logger.trace("aggregating rows starting from {}", start);
-
- // current frame always starts from first batch
- final VectorAccessible first = getCurrent();
- VectorAccessible last = first;
+ private long countPeers(final int start) throws SchemaChangeException {
long length = 0;
// a single frame can include rows from multiple batches
// start processing first batch and, if necessary, move to next batches
for (WindowDataBatch batch : batches) {
- setupEvaluatePeer(batch, container);
final int recordCount = batch.getRecordCount();
// for every remaining row in the partition, count it if it's a peer row
- for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++) {
- if (!isPeer(start, first, row, batch)) {
+ for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
+ if (!isPeer(start, current, row, batch)) {
break;
}
-
- evaluatePeer(row);
- last = batch;
- frameLastRow = row;
}
}
- setupReadLastValue(last, container);
-
return length;
}
- /**
- * @return saved batch that will be processed in doWork()
- */
- private VectorAccessible getCurrent() {
- return batches.get(0);
- }
-
@Override
public int getOutputCount() {
return outputCount;
@@ -340,19 +276,6 @@ public void cleanup() {
internal.clear();
}
- /**
- * called once for each peer row of the current frame.
- * @param index of row to aggregate
- */
- public abstract void evaluatePeer(@Named("index") int index);
- public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
- public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
- public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
-
- public abstract void setupCopyFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
- public abstract void copyFirstValueToInternal(@Named("index") int index);
-
/**
* called once for each row after we evaluate all peer rows. Used to write a value in the row
*
@@ -380,8 +303,6 @@ public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
public abstract void copyNext(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
- public abstract void setupPasteValues(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-
/**
* copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 22617345683..1b124548724 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -28,6 +28,7 @@
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -42,11 +43,13 @@
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import com.google.common.collect.Lists;
import com.sun.codemodel.JExpr;
+import org.apache.drill.exec.vector.ValueVector;
/**
* support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
@@ -58,14 +61,14 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch {
private final RecordBatch incoming;
private List batches;
- private WindowFramer framer;
+ private WindowFramer[] framers;
private boolean hasOrderBy; // true if window definition contains an order-by clause
private final List functions = Lists.newArrayList();
- private boolean noMoreBatches;
+ private boolean noMoreBatches; // true when downstream returns NONE
private BatchSchema schema;
- private boolean shouldStop;
+ private boolean shouldStop; // true if we received an early termination request
public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
@@ -74,41 +77,7 @@ public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, Reco
}
/**
- * Let's assume we have the following 3 batches of data:
- *
- * +---------+--------+--------------+--------+
- * | b0 | b1 | b2 | b3 |
- * +----+----+--------+----+----+----+--------+
- * | p0 | p1 | p1 | p2 | p3 | p4 | p5 |
- * +----+----+--------+----+----+----+--------+
- *
- *
- * batch b0 contains partitions p0 and p1
- * batch b1 contains partition p1
- * batch b2 contains partitions p2 p3 and p4
- * batch b3 contains partition p5
- *
- *
- * when innerNext() is called:
- * call next(incoming), we receive and save b0 in a list of WindowDataBatch
- * we can't process b0 yet because we don't know if p1 has more rows upstream
- * call next(incoming), we receive and save b1
- * we can't process b0 yet for the same reason previously stated
- * call next(incoming), we receive and save b2
- * we process b0 (using the framer) and pass the container downstream
- * when innerNext() is called:
- * we process b1 and pass the container downstream, b0 and b1 are released from memory
- * when innerNext() is called:
- * call next(incoming), we receive and save b3
- * we process b2 and pass the container downstream, b2 is released from memory
- * when innerNext() is called:
- * call next(incoming) and receive NONE
- * we process b3 and pass the container downstream, b3 is released from memory
- * when innerNext() is called:
- * we return NONE
- *
- * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of
- * a partition. We can safely free a batch as soon as it has been processed.
+ * Hold incoming batches in memory until all window functions are ready to process the batch on top of the queue
*/
@Override
public IterOutcome innerNext() {
@@ -174,7 +143,7 @@ public IterOutcome innerNext() {
// process first saved batch, then release it
try {
- framer.doWork();
+ doWork();
} catch (DrillException e) {
context.fail(e);
cleanup();
@@ -188,6 +157,41 @@ public IterOutcome innerNext() {
return IterOutcome.OK;
}
+ private void doWork() throws DrillException {
+
+ final WindowDataBatch current = batches.get(0);
+ final int recordCount = current.getRecordCount();
+
+ logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows", batches.size(), recordCount);
+
+ // allocate outgoing vectors
+ for (VectorWrapper> w : container) {
+ w.getValueVector().allocateNew();
+ }
+
+ for (WindowFramer framer : framers) {
+ framer.doWork();
+ }
+
+ // transfer "non aggregated" vectors
+ for (VectorWrapper> vw : current) {
+ ValueVector v = container.addOrGet(vw.getField());
+ TransferPair tp = vw.getValueVector().makeTransferPair(v);
+ tp.transfer();
+ }
+
+ container.setRecordCount(recordCount);
+ for (VectorWrapper> v : container) {
+ v.getValueVector().getMutator().setValueCount(recordCount);
+ }
+
+ // we can safely free the current batch
+ current.clear();
+ batches.remove(0);
+
+ logger.trace("doWork() END");
+ }
+
/**
* @return true when all window functions are ready to process the current batch (it's the first batch currently
* held in memory)
@@ -204,8 +208,8 @@ private boolean canDoWork() {
final VectorAccessible last = batches.get(batches.size() - 1);
final int lastSize = last.getRecordCount();
- final boolean partitionEndReached = !framer.isSamePartition(currentSize - 1, current, lastSize - 1, last);
- final boolean frameEndReached = partitionEndReached || !framer.isPeer(currentSize - 1, current, lastSize - 1, last);
+ final boolean partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last);
+ final boolean frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last);
for (final WindowFunction function : functions) {
if (!function.canDoWork(batches.size(), hasOrderBy, frameEndReached, partitionEndReached)) {
@@ -234,7 +238,7 @@ protected void buildSchema() throws SchemaChangeException {
}
try {
- framer = createFramer(incoming);
+ createFramers(incoming);
} catch (IOException | ClassTransformationException e) {
throw new SchemaChangeException("Exception when creating the schema", e);
}
@@ -244,18 +248,17 @@ protected void buildSchema() throws SchemaChangeException {
}
}
- private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
- assert framer == null : "createFramer should only be called once";
+ private void createFramers(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
+ assert framers == null : "createFramer should only be called once";
- logger.trace("creating framer");
+ logger.trace("creating framer(s)");
final List keyExprs = Lists.newArrayList();
final List orderExprs = Lists.newArrayList();
boolean requireFullPartition = false;
- container.clear();
-
- functions.clear();
+ boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
+ boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
hasOrderBy = popConfig.getOrderings().length > 0;
@@ -277,12 +280,17 @@ private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeExc
if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
functions.add(winfun);
requireFullPartition |= winfun.requiresFullPartition(hasOrderBy);
+
+ if (winfun.supportsCustomFrames()) {
+ useCustomFrame = true;
+ } else {
+ useDefaultFrame = true;
+ }
}
}
- if (container.isSchemaChanged()) {
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- }
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(0);
// materialize partition by expressions
for (final NamedExpression ne : popConfig.getWithins()) {
@@ -294,15 +302,31 @@ private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeExc
orderExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(oe.getExpr(), batch, context.getFunctionRegistry()));
}
- final WindowFramer framer = generateFramer(keyExprs, orderExprs, functions);
- framer.setup(batches, container, oContext, requireFullPartition);
+ // count how many framers we need
+ int numFramers = useDefaultFrame ? 1 : 0;
+ numFramers += useCustomFrame ? 1 : 0;
+ assert numFramers > 0 : "No framer was needed!";
+
+ framers = new WindowFramer[numFramers];
+ int index = 0;
+ if (useDefaultFrame) {
+ framers[index] = generateFramer(keyExprs, orderExprs, functions, false);
+ framers[index].setup(batches, container, oContext, requireFullPartition);
+ index++;
+ }
- return framer;
+ if (useCustomFrame) {
+ framers[index] = generateFramer(keyExprs, orderExprs, functions, true);
+ framers[index].setup(batches, container, oContext, requireFullPartition);
+ }
}
private WindowFramer generateFramer(final List keyExprs, final List orderExprs,
- final List functions) throws IOException, ClassTransformationException {
- final ClassGenerator cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ final List functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+
+ TemplateClassDefinition definition = useCustomFrame ?
+ WindowFramer.CUSTOM_TEMPLATE_DEFINITION : WindowFramer.DEFAULT_TEMPLATE_DEFINITION;
+ final ClassGenerator cg = CodeGenerator.getRoot(definition, context.getFunctionRegistry());
{
// generating framer.isSamePartition()
@@ -322,7 +346,10 @@ private WindowFramer generateFramer(final List keyExprs, fina
}
for (final WindowFunction function : functions) {
- function.generateCode(cg);
+ // only generate code for the proper window functions
+ if (function.supportsCustomFrames() == useCustomFrame) {
+ function.generateCode(cg);
+ }
}
cg.getBlock("resetValues")._return(JExpr.TRUE);
@@ -356,9 +383,13 @@ private void setupIsFunction(final ClassGenerator cg, final Iterab
}
private void cleanup() {
- if (framer != null) {
- framer.cleanup();
- framer = null;
+
+ if (framers != null) {
+ for (WindowFramer framer : framers) {
+ framer.cleanup();
+ }
+
+ framers = null;
}
if (batches != null) {
@@ -383,6 +414,6 @@ protected void killIncoming(boolean sendUpstream) {
@Override
public int getRecordCount() {
- return framer.getOutputCount();
+ return framers[0].getOutputCount();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 719d5a84962..ddf3626cfe3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -28,7 +28,8 @@
import java.util.List;
public interface WindowFramer {
- TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
+ TemplateClassDefinition DEFAULT_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
+ TemplateClassDefinition CUSTOM_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, CustomFrameTemplate.class);
void setup(final List batches, final VectorContainer container, final OperatorContext operatorContext,
final boolean requireFullPartition) throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
index 548809b3652..1c712972c9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
@@ -92,6 +92,8 @@ static WindowFunction fromExpression(final FunctionCall call) {
abstract void generateCode(final ClassGenerator cg);
+ abstract boolean supportsCustomFrames();
+
/**
* @param hasOrderBy window definition contains an ORDER BY clause
* @return true if this window function requires all batches of current partition to be available before processing
@@ -161,6 +163,11 @@ public boolean requiresFullPartition(final boolean hasOrderBy) {
public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
return partitionEndReached || (hasOrderBy && frameEndReached);
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return true;
+ }
}
static class Ranking extends WindowFunction {
@@ -218,6 +225,11 @@ public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, bool
// otherwise we can process the first batch immediately
return partitionEndReached || ! requiresFullPartition(hasOrderBy);
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return false;
+ }
}
static class Ntile extends Ranking {
@@ -315,6 +327,11 @@ public boolean requiresFullPartition(final boolean hasOrderBy) {
public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
return partitionEndReached || numBatchesAvailable > 1;
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return false;
+ }
}
static class Lag extends WindowFunction {
@@ -381,6 +398,11 @@ public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, bool
assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0";
return true;
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return false;
+ }
}
static class LastValue extends WindowFunction {
@@ -419,9 +441,8 @@ void generateCode(ClassGenerator cg) {
// this will generate the the following, pseudo, code:
// write current.source_last_value[frameLastRow] to container.last_value[row]
- final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupReadLastValue", "readLastValue", null, null);
- final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupReadLastValue", "writeLastValue", "resetValues", "cleanup");
- final MappingSet mappingSet = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+ final GeneratorMapping mapping = GeneratorMapping.create("setupReadLastValue", "writeLastValue", "resetValues", "cleanup");
+ final MappingSet mappingSet = new MappingSet("index", "outIndex", mapping, mapping);
cg.setMappingSet(mappingSet);
cg.addExpr(writeSourceToLastValue);
@@ -436,6 +457,11 @@ public boolean requiresFullPartition(final boolean hasOrderBy) {
public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
return partitionEndReached || (hasOrderBy && frameEndReached);
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return true;
+ }
}
static class FirstValue extends WindowFunction {
@@ -471,10 +497,10 @@ boolean materialize(final NamedExpression ne, final VectorContainer batch, final
@Override
void generateCode(final ClassGenerator cg) {
{
- // in DefaultFrameTemplate we call setupCopyFirstValue:
- // setupCopyFirstValue(current, internal)
- // and copyFirstValueToInternal:
- // copyFirstValueToInternal(currentRow, 0)
+ // in DefaultFrameTemplate we call setupSaveFirstValue:
+ // setupSaveFirstValue(current, internal)
+ // and saveFirstValue:
+ // saveFirstValue(currentRow, 0)
//
// this will generate the the following, pseudo, code:
// write current.source[currentRow] to internal.first_value[0]
@@ -482,7 +508,7 @@ void generateCode(final ClassGenerator cg) {
// so it basically copies the first value of current partition into the first row of internal.first_value
// this is especially useful when handling multiple batches for the same partition where we need to keep
// the first value of the partition somewhere after we release the first batch
- final GeneratorMapping mapping = GeneratorMapping.create("setupCopyFirstValue", "copyFirstValueToInternal", null, null);
+ final GeneratorMapping mapping = GeneratorMapping.create("setupSaveFirstValue", "saveFirstValue", null, null);
final MappingSet mappingSet = new MappingSet("index", "0", mapping, mapping);
cg.setMappingSet(mappingSet);
@@ -490,8 +516,8 @@ void generateCode(final ClassGenerator cg) {
}
{
- // in DefaultFrameTemplate we call setupPasteValues:
- // setupPasteValues(internal, container)
+ // in DefaultFrameTemplate we call setupWriteFirstValue:
+ // setupWriteFirstValue(internal, container)
// and outputRow:
// outputRow(outIndex)
//
@@ -499,7 +525,7 @@ void generateCode(final ClassGenerator cg) {
// write internal.first_value[0] to container.first_value[outIndex]
//
// so it basically copies the value stored in internal.first_value's first row into all rows of container.first_value
- final GeneratorMapping mapping = GeneratorMapping.create("setupPasteValues", "outputRow", "resetValues", "cleanup");
+ final GeneratorMapping mapping = GeneratorMapping.create("setupWriteFirstValue", "outputRow", "resetValues", "cleanup");
final MappingSet mappingSet = new MappingSet("0", "outIndex", mapping, mapping);
cg.setMappingSet(mappingSet);
cg.addExpr(writeFirstValueToFirstValue);
@@ -516,5 +542,10 @@ public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean fr
assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0";
return true;
}
+
+ @Override
+ boolean supportsCustomFrames() {
+ return true;
+ }
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 6cb0f4a3cd5..10abbff8b1d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -98,6 +98,14 @@ public void testB1P2() throws Exception {
runTest("b1.p2", 1);
}
+ @Test
+ public void testMultipleFramers() throws Exception {
+ final String window = " OVER(PARTITION BY position_id ORDER by sub)";
+ test("SELECT COUNT(*)"+window+", SUM(salary)"+window+", ROW_NUMBER()"+window+", RANK()"+window+" " +
+ "FROM dfs_test.`"+TEST_RES_PATH+"/window/b1.p1`"
+ );
+ }
+
/**
* 2 batches with 2 partitions (position_id column), each batch contains a different partition
*/
From b950d987c076b8f2a54b62d5279ed2c1ecaaf94c Mon Sep 17 00:00:00 2001
From: adeneche
Date: Thu, 21 Jan 2016 11:09:45 -0800
Subject: [PATCH 2/2] renaming DefaultFrameTemplate and CustomFrameTemplate
---
...ustomFrameTemplate.java => FrameSupportTemplate.java} | 9 +++++----
...ultFrameTemplate.java => NoFrameSupportTemplate.java} | 9 +++++----
.../physical/impl/window/WindowFrameRecordBatch.java | 2 +-
.../drill/exec/physical/impl/window/WindowFramer.java | 4 ++--
4 files changed, 13 insertions(+), 11 deletions(-)
rename exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/{CustomFrameTemplate.java => FrameSupportTemplate.java} (96%)
rename exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/{DefaultFrameTemplate.java => NoFrameSupportTemplate.java} (96%)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
similarity index 96%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 17a1d21471d..16c751349fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/CustomFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -31,11 +31,12 @@
/**
- * WindowFramer implementation that supports the FRAME clause. Can be used with FIRST_VALUE, LAST_VALUE and
- * all aggregate functions
+ * WindowFramer implementation that supports the FRAME clause.
+ *
According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
+ * This class will handle such functions even if the FRAME clause is not present.
*/
-public abstract class CustomFrameTemplate implements WindowFramer {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
+public abstract class FrameSupportTemplate implements WindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
private VectorContainer container;
private VectorContainer internal;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
similarity index 96%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index fae15ba7a70..ac1eefc9965 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -31,11 +31,12 @@
/**
- * WindowFramer implementation that only supports the default frame. Can be used with LEAD, LAG, ROW_NUMBER, and
- * all ranking functions
+ * WindowFramer implementation that doesn't support the FRAME clause (will assume the default frame).
+ *
According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking functions don't support the FRAME clause.
+ * This class will handle such functions.
*/
-public abstract class DefaultFrameTemplate implements WindowFramer {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
+public abstract class NoFrameSupportTemplate implements WindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
private VectorContainer container;
private VectorContainer internal;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 1b124548724..d6be1eb9bdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -325,7 +325,7 @@ private WindowFramer generateFramer(final List keyExprs, fina
final List functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
TemplateClassDefinition definition = useCustomFrame ?
- WindowFramer.CUSTOM_TEMPLATE_DEFINITION : WindowFramer.DEFAULT_TEMPLATE_DEFINITION;
+ WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
final ClassGenerator cg = CodeGenerator.getRoot(definition, context.getFunctionRegistry());
{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index ddf3626cfe3..9b985c0cf3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -28,8 +28,8 @@
import java.util.List;
public interface WindowFramer {
- TemplateClassDefinition DEFAULT_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
- TemplateClassDefinition CUSTOM_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, CustomFrameTemplate.class);
+ TemplateClassDefinition NOFRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
+ TemplateClassDefinition FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
void setup(final List batches, final VectorContainer container, final OperatorContext operatorContext,
final boolean requireFullPartition) throws SchemaChangeException;