Skip to content

Commit

Permalink
DRILL-6962: Function coalesce returns an Error when none of the colum…
Browse files Browse the repository at this point in the history
…ns in coalesce exist in a parquet file

- Updated UntypedNullVector to hold value count when vector is allocated and transfered to another one;
- Updated RecordBatchLoader and DrillCursor to handle case when only UntypedNull values are present in RecordBatch (special case when data buffer is null but actual values are present);
- Added functions to cast UntypedNull value to other types;
- Moved UntypedReader, UntypedHolderReaderImpl and UntypedReaderImpl from org.apache.drill.exec.vector.complex.impl to org.apache.drill.exec.vector package.
  • Loading branch information
KazydubB committed Jan 22, 2019
1 parent 72cba88 commit 3b10351
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 26 deletions.
20 changes: 20 additions & 0 deletions exec/java-exec/src/main/codegen/data/Casts.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,25 @@
{from: "NullableVarBinary", to: "NullableFloat8", major: "EmptyString", javaType:"Double", parse:"Double"},

{from: "NullableVarBinary", to: "NullableVarDecimal", major: "NullableVarCharDecimalComplex"},

{from: "UntypedNull", to: "TinyInt", major: "UntypedNull"},
{from: "UntypedNull", to: "Int", major: "UntypedNull"},
{from: "UntypedNull", to: "BigInt", major: "UntypedNull"},
{from: "UntypedNull", to: "Float4", major: "UntypedNull"},
{from: "UntypedNull", to: "Float8", major: "UntypedNull"},
{from: "UntypedNull", to: "Date", major: "UntypedNull"},
{from: "UntypedNull", to: "Time", major: "UntypedNull"},
{from: "UntypedNull", to: "TimeStamp", major: "UntypedNull"},
{from: "UntypedNull", to: "Interval", major: "UntypedNull"},
{from: "UntypedNull", to: "IntervalDay", major: "UntypedNull"},
{from: "UntypedNull", to: "IntervalYear", major: "UntypedNull"},
{from: "UntypedNull", to: "VarBinary", major: "UntypedNull"},
{from: "UntypedNull", to: "VarChar", major: "UntypedNull"},
{from: "UntypedNull", to: "Var16Char", major: "UntypedNull"},
{from: "UntypedNull", to: "VarDecimal", major: "UntypedNull"},
{from: "UntypedNull", to: "Decimal9", major: "UntypedNull"},
{from: "UntypedNull", to: "Decimal18", major: "UntypedNull"},
{from: "UntypedNull", to: "Decimal28Sparse", major: "UntypedNull"},
{from: "UntypedNull", to: "Decimal38Sparse", major: "UntypedNull"},
]
}
61 changes: 61 additions & 0 deletions exec/java-exec/src/main/codegen/templates/CastUntypedNull.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<@pp.dropOutputFile />

<#list cast.types as type>
<#if type.major == "UntypedNull">

<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gcast/Cast${type.from}${type.to}.java" />

<#include "/@includes/license.ftl" />
package org.apache.drill.exec.expr.fn.impl.gcast;

import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.vector.UntypedNullHolder;

/*
* This class is generated using freemarker and the ${.template_name} template.
*/
@FunctionTemplate(name = "cast${type.to?upper_case}",
scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
public class Cast${type.from}${type.to} implements DrillSimpleFunc {

@Param ${type.from}Holder in;
<#if type.to.contains("Decimal")>
@Param IntHolder precision;
@Param IntHolder scale;
<#elseif type.to == "VarChar" || type.to == "VarBinary" || type.to == "Var16Char">
@Param BigIntHolder len;
</#if>
@Output Nullable${type.to}Holder out;

public void setup() {
}

public void eval() {
out.isSet = 0;
}
}
</#if> <#-- type.major -->
</#list>

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.UntypedNullVector;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -128,6 +129,11 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti

// Load the vector.
if (buf == null) {
// Buffers for untyped null vectors are always null and for the case
// field value alone is sufficient to load the vector
if (vector instanceof UntypedNullVector) {
vector.load(field, null);
}
// Schema only
} else if (field.getValueCount() == 0) {
AllocationHelper.allocate(vector, 0, 0, 0);
Expand Down
86 changes: 86 additions & 0 deletions exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,91 @@ public void testTypeAndMode() throws Exception {
assertEquals(0, summary.recordCount());
}

@Test
public void testCoalesceOnNotExistentColumns() throws Exception {
String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` limit 5";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("coal")
.baselineValuesForSingleColumn(null, null, null, null, null)
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsWithGroupBy() throws Exception {
String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` group by 1";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("coal")
.baselineValuesForSingleColumn(new Object[] {null})
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsWithOrderBy() throws Exception {
String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` order by 1 limit 5";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("coal")
.baselineValuesForSingleColumn(null, null, null, null, null)
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsWithCoalesceInWhereClause() throws Exception {
String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` where coalesce(unk1, unk2) > 10";
testBuilder()
.sqlQuery(query)
.unOrdered()
.expectsNumRecords(0)
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsWithCoalesceInHavingClause() throws Exception {
String query = "select 1 from cp.`tpch/nation.parquet` group by n_name having count(coalesce(unk1, unk2)) > 10";
testBuilder()
.sqlQuery(query)
.unOrdered()
.expectsNumRecords(0)
.go();
}

@Test
public void testPartitionByCoalesceOnNotExistentColumns() throws Exception {
String query =
"select row_number() over (partition by coalesce(unk1, unk2)) as row_num from cp.`tpch/nation.parquet` limit 5";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("row_num")
.baselineValuesForSingleColumn(1L, 2L, 3L, 4L, 5L)
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsInUDF() throws Exception {
String query = "select substr(coalesce(unk1, unk2), 1, 2) as coal from cp.`tpch/nation.parquet` limit 5";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("coal")
.baselineValuesForSingleColumn(null, null, null, null, null)
.go();
}

@Test
public void testCoalesceOnNotExistentColumnsInUDF2() throws Exception {
String query = "select abs(coalesce(unk1, unk2)) as coal from cp.`tpch/nation.parquet` limit 5";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("coal")
.baselineValuesForSingleColumn(null, null, null, null, null)
.go();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -749,4 +749,21 @@ public void testCastTimeLiteralInFilter() throws Exception {
run("drop table if exists dfs.tmp.test_time_filter");
}
}

@Test
public void testCastUntypedNull() throws Exception {
String[] types = new String[] {
"BOOLEAN", "INT", "BIGINT", "FLOAT", "DOUBLE", "DATE", "TIME", "TIMESTAMP", "INTERVAL MONTH",
"INTERVAL YEAR", "VARBINARY", "VARCHAR", "DECIMAL(9)", "DECIMAL(18)", "DECIMAL(28)", "DECIMAL(38)"
};
String query = "select cast(coalesce(unk1, unk2) as %s) as coal from cp.`tpch/nation.parquet` limit 1";
for (String type : types) {
testBuilder()
.sqlQuery(String.format(query, type))
.unOrdered()
.baselineColumns("coal")
.baselineValues(new Object[] {null})
.go();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,11 @@ private boolean nextRowInternally() throws SQLException {
QueryDataBatch qrb = resultsListener.getNext();

// (Apparently:) Skip any spurious empty batches (batches that have
// zero rows and/or null data, other than the first batch (which carries
// zero rows and null data, other than the first batch (which carries
// the (initial) schema but no rows)).
if ( afterFirstBatch ) {
while ( qrb != null
&& ( qrb.getHeader().getRowCount() == 0
|| qrb.getData() == null ) ) {
if (afterFirstBatch) {
while (qrb != null
&& (qrb.getHeader().getRowCount() == 0 && qrb.getData() == null)) {
// Empty message--dispose of and try to get another.
logger.warn( "Spurious batch read: {}", qrb );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
* This class is generated using freemarker and the ${.template_name} template.
*/
@SuppressWarnings("unused")
abstract class AbstractFieldReader extends AbstractBaseReader implements FieldReader {
public abstract class AbstractFieldReader extends AbstractBaseReader implements FieldReader {

AbstractFieldReader() {
public AbstractFieldReader() {
}

/**
Expand Down
2 changes: 2 additions & 0 deletions exec/vector/src/main/codegen/templates/BasicTypeHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public static int getSize(MajorType major) {
case FIXEDCHAR: return major.getPrecision();
case FIXED16CHAR: return major.getPrecision();
case FIXEDBINARY: return major.getPrecision();
case NULL:
return 0;
}
throw new UnsupportedOperationException(buildErrorMessage("get size", major));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.complex.impl;
package org.apache.drill.exec.vector;

import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.vector.UntypedNullHolder;
import org.apache.drill.exec.vector.complex.impl.AbstractFieldReader;

public class UntypedHolderReaderImpl extends AbstractFieldReader {

Expand Down Expand Up @@ -47,5 +47,4 @@ public TypeProtos.MajorType getType() {
public boolean isSet() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.vector;


import org.apache.drill.exec.vector.complex.impl.UntypedReaderImpl;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
Expand Down Expand Up @@ -47,7 +45,6 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe

public UntypedNullVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
valueCount = 0;
}

@Override
Expand Down Expand Up @@ -77,7 +74,9 @@ public void allocateNew() { }
public boolean allocateNewSafe() { return true; }

@Override
public void allocateNew(final int valueCount) { }
public void allocateNew(final int valueCount) {
this.valueCount = valueCount;
}

@Override
public void reset() { }
Expand Down Expand Up @@ -125,7 +124,9 @@ public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((UntypedNullVector) to);
}

public void transferTo(UntypedNullVector target) { }
public void transferTo(UntypedNullVector target) {
target.valueCount = valueCount;
}

public void splitAndTransferTo(int startIndex, int length, UntypedNullVector target) { }

Expand Down Expand Up @@ -170,7 +171,6 @@ public void copyFromSafe(int fromIndex, int thisIndex, UntypedNullVector from) {

@Override
public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
((UntypedNullVector) from).data.getBytes(fromIndex * 4, data, toIndex * 4, 4);
}

public final class Accessor extends BaseAccessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.complex.impl;
package org.apache.drill.exec.vector;

import org.apache.drill.exec.vector.UntypedNullHolder;
import org.apache.drill.exec.vector.complex.reader.BaseReader;

public interface UntypedReader extends BaseReader {
Expand All @@ -26,5 +25,4 @@ public interface UntypedReader extends BaseReader {
int size();
void read(UntypedNullHolder holder);
void read(int arrayIndex, UntypedNullHolder holder);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.complex.impl;
package org.apache.drill.exec.vector;

import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.vector.UntypedNullHolder;
import org.apache.drill.exec.vector.complex.impl.AbstractFieldReader;

public class UntypedReaderImpl extends AbstractFieldReader {

Expand Down Expand Up @@ -46,5 +46,4 @@ public void read(UntypedNullHolder holder) {
public void read(int arrayIndex, UntypedNullHolder holder) {
holder.isSet = 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
*/
package org.apache.drill.exec.vector.complex.reader;

import org.apache.drill.exec.vector.complex.impl.UntypedReader;
import org.apache.drill.exec.vector.UntypedReader;
import org.apache.drill.exec.vector.complex.reader.BaseReader.ListReader;
import org.apache.drill.exec.vector.complex.reader.BaseReader.MapReader;
import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedListReader;
import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedMapReader;
import org.apache.drill.exec.vector.complex.reader.BaseReader.ScalarReader;



public interface FieldReader extends MapReader, ListReader, ScalarReader, RepeatedMapReader, RepeatedListReader, UntypedReader {
}
}

0 comments on commit 3b10351

Please sign in to comment.