Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,13 @@ private HoldingContainer visitValueVectorWriteExpression(ValueVectorWriteExpress
final JInvocation setMeth = GetSetVectorHelper.write(e.getChild().getMajorType(), vv, inputContainer, outIndex, e.isSafe() ? "setSafe" : "set");
if (inputContainer.isOptional()) {
JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
block = jc._then();
jc._then().add(setMeth);
if (e.isWriteNulls()) {
jc._else().add(vv.invoke("getMutator").invoke("setNull").arg(outIndex));
}
} else {
block.add(setMeth);
}
block.add(setMeth);

}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@ public class ValueVectorWriteExpression implements LogicalExpression {
private final TypedFieldId fieldId;
private final LogicalExpression child;
private final boolean safe;
/**
* When writing into a value vector the default assumption is that the destination vector is empty, so we don't need
* to write null values and we can just "skip" their position. Setting writeNulls to true will override this behavior
* and explicitly write null values into the destination value vector. This is especially useful when the value vector
* is used as an internal storage that can be reused without clearing it first.
*/
private final boolean writeNulls;

public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child){
this(fieldId, child, false);
this(fieldId, child, false, false);
}

public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child, boolean safe){
this(fieldId, child, safe, false);
}

public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child, boolean safe, boolean writeNulls){
this.fieldId = fieldId;
this.child = child;
this.safe = safe;
this.writeNulls = writeNulls;
}

public TypedFieldId getFieldId() {
Expand All @@ -59,6 +71,13 @@ public boolean isSafe() {
return safe;
}

/**
* @return true if NULL values are explicitly written
*/
public boolean isWriteNulls() {
return writeNulls;
}

@Override
public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
return visitor.visitUnknown(this, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public void doWork() throws DrillException {

final WindowDataBatch current = batches.get(0);

setupCopyFirstValue(current, internal);

// we need to store the record count explicitly, because we release current batch at the end of this call
outputCount = current.getRecordCount();

Expand Down Expand Up @@ -136,14 +138,12 @@ private void newPartition(final WindowDataBatch current, final int currentRow) t
final long length = computePartitionSize(currentRow);
partition = new Partition(length);
setupPartition(current, container);
setupCopyFirstValue(current, internal);
copyFirstValueToInternal(currentRow);
}

private void cleanPartition() {
partition = null;
resetValues();
internal.zeroVectors();
lagCopiedToInternal = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JInvocation;
import com.sun.codemodel.JVar;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
Expand All @@ -40,6 +41,8 @@
import org.apache.drill.exec.record.VectorContainer;

public abstract class WindowFunction {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFunction.class);

public enum Type {
ROW_NUMBER,
RANK,
Expand Down Expand Up @@ -181,12 +184,12 @@ public Ntile() {
private int numTilesFromExpression(LogicalExpression numTilesExpr) {
if ((numTilesExpr instanceof ValueExpressions.IntExpression)) {
int nt = ((ValueExpressions.IntExpression) numTilesExpr).getInt();
if (nt >= 0) {
if (nt > 0) {
return nt;
}
}

throw new IllegalArgumentException("NTILE only accepts unsigned integer argument");
throw UserException.functionError().message("NTILE only accepts positive integer argument").build(logger);
}

@Override
Expand Down Expand Up @@ -285,7 +288,7 @@ boolean materialize(final NamedExpression ne, final VectorContainer batch, final
batch.addOrGet(output).allocateNew();
final TypedFieldId outputId = batch.getValueVectorId(ne.getRef());

writeInputToLag = new ValueVectorWriteExpression(outputId, input, true);
writeInputToLag = new ValueVectorWriteExpression(outputId, input, true, true);
writeLagToLag = new ValueVectorWriteExpression(outputId, new ValueVectorReadExpression(outputId), true);
return true;
}
Expand Down Expand Up @@ -383,7 +386,7 @@ boolean materialize(final NamedExpression ne, final VectorContainer batch, final
// write incoming.first_value[inIndex] to outgoing.first_value[outIndex]
writeFirstValueToFirstValue = new ValueVectorWriteExpression(outputId, new ValueVectorReadExpression(outputId), true);
// write incoming.source[inIndex] to outgoing.first_value[outIndex]
writeInputToFirstValue = new ValueVectorWriteExpression(outputId, input, true);
writeInputToFirstValue = new ValueVectorWriteExpression(outputId, input, true, true);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.drill.BaseTestQuery;
import org.apache.drill.DrillTestWrapper;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -276,36 +280,61 @@ public void test3648Fix() throws Exception {
.run();
}

@Test
public void test3654Fix() throws Exception {
test("SELECT FIRST_VALUE(col8) OVER(PARTITION BY col7 ORDER BY col8) FROM dfs_test.`%s/window/3648.parquet`", TEST_RES_PATH);
}

@Test
public void test3643Fix() throws Exception {
try {
test("SELECT NTILE(0) OVER(PARTITION BY col7 ORDER BY col8) FROM dfs_test.`%s/window/3648.parquet`", TEST_RES_PATH);
fail("Query should have failed");
} catch (UserRemoteException e) {
assertEquals(ErrorType.FUNCTION, e.getErrorType());
}
}

@Test
public void test3668Fix() throws Exception {
testBuilder()
.sqlQuery(getFile("window/3668.sql"), TEST_RES_PATH)
.ordered()
.baselineColumns("cnt").baselineValues(2L)
.build()
.run();
}

@Test
public void testPartitionNtile() {
Partition partition = new Partition(12);

Assert.assertEquals(1, partition.ntile(5));
assertEquals(1, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(1, partition.ntile(5));
assertEquals(1, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(1, partition.ntile(5));
assertEquals(1, partition.ntile(5));

partition.rowAggregated();
Assert.assertEquals(2, partition.ntile(5));
assertEquals(2, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(2, partition.ntile(5));
assertEquals(2, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(2, partition.ntile(5));
assertEquals(2, partition.ntile(5));

partition.rowAggregated();
Assert.assertEquals(3, partition.ntile(5));
assertEquals(3, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(3, partition.ntile(5));
assertEquals(3, partition.ntile(5));

partition.rowAggregated();
Assert.assertEquals(4, partition.ntile(5));
assertEquals(4, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(4, partition.ntile(5));
assertEquals(4, partition.ntile(5));

partition.rowAggregated();
Assert.assertEquals(5, partition.ntile(5));
assertEquals(5, partition.ntile(5));
partition.rowAggregated();
Assert.assertEquals(5, partition.ntile(5));
assertEquals(5, partition.ntile(5));
}
}
Binary file not shown.
6 changes: 6 additions & 0 deletions exec/java-exec/src/test/resources/window/3668.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select count(fv) as cnt
from (
select first_value(c2) over(partition by c2 order by c1) as fv
from dfs_test.`%s/window/3668.parquet`
)
where fv = 'e'