diff --git a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java index 3b1d86f8111..1d6e0a7592c 100644 --- a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java +++ b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java @@ -17,6 +17,12 @@ */ <@pp.dropOutputFile /> +<#macro reassignHolder> + previous.buffer = buf.reallocIfNeeded(length); + previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); + previous.end = in.end - in.start; + + <@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/GNewValueFunctions.java" /> <#include "/@includes/license.ftl" /> @@ -39,28 +45,51 @@ */ public class GNewValueFunctions { <#list vv.types as type> -<#if type.major == "Fixed" || type.major = "Bit"> - <#list type.minor as minor> <#list vv.modes as mode> <#if mode.name != "Repeated"> <#if !minor.class.startsWith("Decimal28") && !minor.class.startsWith("Decimal38") && !minor.class.startsWith("Interval")> @SuppressWarnings("unused") -@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL) -public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc{ +@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) +public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc { @Param ${mode.prefix}${minor.class}Holder in; @Workspace ${mode.prefix}${minor.class}Holder previous; @Workspace Boolean initialized; @Output BitHolder out; + <#if type.major == "VarLen"> + @Inject DrillBuf buf; + public void setup() { initialized = false; + <#if type.major == "VarLen"> + previous.buffer = buf; + previous.start = 0; + } - <#if mode.name == "Required"> public void eval() { + <#if mode.name == "Required"> + <#if type.major == "VarLen"> + int length = in.end - in.start; + + if (initialized) { + if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare( + previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) { + out.value = 0; + } else { + <@reassignHolder/> + out.value = 1; + } + } else { + <@reassignHolder/> + out.value = 1; + initialized = true; + } + + <#if type.major == "Fixed" || type.major == "Bit"> if (initialized) { if (in.value == previous.value) { out.value = 0; @@ -73,10 +102,36 @@ public void eval() { out.value = 1; initialized = true; } - } + <#-- mode.name == "Required" --> + <#if mode.name == "Optional"> - public void eval() { + <#if type.major == "VarLen"> + int length = in.isSet == 0 ? 0 : in.end - in.start; + + if (initialized) { + if (previous.isSet == 0 && in.isSet == 0) { + out.value = 0; + } else if (previous.isSet != 0 && in.isSet != 0 && org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare( + previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) { + out.value = 0; + } else { + if (in.isSet == 1) { + <@reassignHolder/> + } + previous.isSet = in.isSet; + out.value = 1; + } + } else { + if (in.isSet == 1) { + <@reassignHolder/> + } + previous.isSet = in.isSet; + out.value = 1; + initialized = true; + } + + <#if type.major == "Fixed" || type.major == "Bit"> if (initialized) { if (in.isSet == 0 && previous.isSet == 0) { out.value = 0; @@ -93,14 +148,14 @@ public void eval() { out.value = 1; initialized = true; } - } + <#-- mode.name == "Optional" --> + } } <#-- minor.class.startWith --> <#-- mode.name --> - <#-- type.major --> } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java deleted file mode 100644 index fedb4733a5f..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store; - -import io.netty.buffer.DrillBuf; -import org.apache.drill.exec.expr.DrillSimpleFunc; -import org.apache.drill.exec.expr.annotations.FunctionTemplate; -import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; -import org.apache.drill.exec.expr.annotations.Output; -import org.apache.drill.exec.expr.annotations.Param; -import org.apache.drill.exec.expr.annotations.Workspace; -import org.apache.drill.exec.expr.holders.BitHolder; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; -import org.apache.drill.exec.expr.holders.VarBinaryHolder; -import org.apache.drill.exec.expr.holders.VarCharHolder; - -import javax.inject.Inject; - -/** - * The functions are similar to those created through FreeMarker template for fixed types. There is not much benefit to - * using code generation for generating the functions for variable length types, so simply doing them by hand. - */ -public class NewValueFunction { - - @FunctionTemplate(name = "newPartitionValue", - scope = FunctionTemplate.FunctionScope.SIMPLE, - nulls = NullHandling.INTERNAL) - public static class NewValueVarChar implements DrillSimpleFunc { - - @Param VarCharHolder in; - @Workspace VarCharHolder previous; - @Workspace Boolean initialized; - @Output BitHolder out; - @Inject DrillBuf buf; - - public void setup() { - initialized = false; - previous.buffer = buf; - previous.start = 0; - } - - public void eval() { - int length = in.end - in.start; - - if (initialized) { - if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) { - out.value = 0; - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - out.value = 1; - } - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - out.value = 1; - initialized = true; - } - } - } - - @FunctionTemplate(name = "newPartitionValue", - scope = FunctionTemplate.FunctionScope.SIMPLE, - nulls = NullHandling.INTERNAL) - public static class NewValueVarCharNullable implements DrillSimpleFunc { - - @Param NullableVarCharHolder in; - @Workspace NullableVarCharHolder previous; - @Workspace Boolean initialized; - @Output BitHolder out; - @Inject DrillBuf buf; - - public void setup() { - initialized = false; - previous.buffer = buf; - previous.start = 0; - } - - public void eval() { - int length = in.isSet == 0 ? 0 : in.end - in.start; - - if (initialized) { - if (previous.isSet == 0 && in.isSet == 0 || - (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare( - previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) { - out.value = 0; - } else { - if (in.isSet == 1) { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - } - previous.isSet = in.isSet; - out.value = 1; - } - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - previous.isSet = 1; - out.value = 1; - initialized = true; - } - } - } - - @FunctionTemplate(name = "newPartitionValue", - scope = FunctionTemplate.FunctionScope.SIMPLE, - nulls = NullHandling.INTERNAL) - public static class NewValueVarBinary implements DrillSimpleFunc { - - @Param VarBinaryHolder in; - @Workspace VarBinaryHolder previous; - @Workspace Boolean initialized; - @Output BitHolder out; - @Inject DrillBuf buf; - - public void setup() { - initialized = false; - previous.buffer = buf; - previous.start = 0; - } - - public void eval() { - int length = in.end - in.start; - - if (initialized) { - if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) { - out.value = 0; - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - out.value = 1; - } - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - out.value = 1; - initialized = true; - } - } - } - - @FunctionTemplate(name = "newPartitionValue", - scope = FunctionTemplate.FunctionScope.SIMPLE, - nulls = NullHandling.INTERNAL) - public static class NewValueVarBinaryNullable implements DrillSimpleFunc { - - @Param NullableVarBinaryHolder in; - @Workspace NullableVarBinaryHolder previous; - @Workspace Boolean initialized; - @Output BitHolder out; - @Inject DrillBuf buf; - - public void setup() { - initialized = false; - previous.buffer = buf; - previous.start = 0; - } - - public void eval() { - int length = in.isSet == 0 ? 0 : in.end - in.start; - - if (initialized) { - if (previous.isSet == 0 && in.isSet == 0 || - (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare( - previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) { - out.value = 0; - } else { - if (in.isSet == 1) { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - } - previous.isSet = in.isSet; - out.value = 1; - } - } else { - previous.buffer = buf.reallocIfNeeded(length); - previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start); - previous.end = in.end - in.start; - previous.isSet = 1; - out.value = 1; - initialized = true; - } - } - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java index 18071a01655..36ee1b91659 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java @@ -25,6 +25,8 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.junit.Ignore; import org.junit.Test; @@ -285,41 +287,51 @@ public void stddevEmptyNonexistentNullableInput() throws Exception { } @Test - public void minEmptyNonnullableInput() throws Exception { - // test min function on required type - String query = "select " + - "min(bool_col) col1, min(int_col) col2, min(bigint_col) col3, min(float4_col) col4, min(float8_col) col5, " + - "min(date_col) col6, min(time_col) col7, min(timestamp_col) col8, min(interval_year_col) col9, " + - "min(varhcar_col) col10 " + - "from cp.`parquet/alltypes_required.parquet` where 1 = 0"; - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10") - .baselineValues(null, null, null, null, null, null, null, null, null, null) - .go(); - } + public void minMaxEmptyNonNullableInput() throws Exception { + // test min and max functions on required type + + final QueryDataBatch result = testSqlWithResults("select * from cp.`parquet/alltypes_required.parquet` limit 0") + .get(0); + + final Map functions = Maps.newHashMap(); + functions.put("min", new StringBuilder()); + functions.put("max", new StringBuilder()); + + final Map resultingValues = Maps.newHashMap(); + for (UserBitShared.SerializedField field : result.getHeader().getDef().getFieldList()) { + final String fieldName = field.getNamePart().getName(); + // Only COUNT aggregate function supported for Boolean type + if (fieldName.equals("col_bln")) { + continue; + } + resultingValues.put(String.format("`%s`", fieldName), null); + for (Map.Entry function : functions.entrySet()) { + function.getValue() + .append(function.getKey()) + .append("(") + .append(fieldName) + .append(") ") + .append(fieldName) + .append(","); + } + } + result.release(); - @Test - public void maxEmptyNonnullableInput() throws Exception { + final String query = "select %s from cp.`parquet/alltypes_required.parquet` where 1 = 0"; + final List> baselineRecords = Lists.newArrayList(); + baselineRecords.add(resultingValues); - // test max function - final String query = "select " + - "max(int_col) col1, max(bigint_col) col2, max(float4_col) col3, max(float8_col) col4, " + - "max(date_col) col5, max(time_col) col6, max(timestamp_col) col7, max(interval_year_col) col8, " + - "max(varhcar_col) col9 " + - "from cp.`parquet/alltypes_required.parquet` where 1 = 0"; + for (StringBuilder selectBody : functions.values()) { + selectBody.setLength(selectBody.length() - 1); - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9") - .baselineValues(null, null, null, null, null, null, null, null, null) - .go(); + testBuilder() + .sqlQuery(query, selectBody.toString()) + .unOrdered() + .baselineRecords(baselineRecords) + .go(); + } } - /* * Streaming agg on top of a filter produces wrong results if the first two batches are filtered out. * In the below test we have three files in the input directory and since the ordering of reading diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java index 5294709c6da..9d9b403ede3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java @@ -17,11 +17,15 @@ */ package org.apache.drill.exec.sql; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.junit.Test; import java.io.File; +import java.util.Map; public class TestCTAS extends BaseTestQuery { @Test // DRILL-2589 @@ -125,8 +129,7 @@ public void ctasPartitionWithEmptyList() throws Exception { try { final String ctasQuery = String.format("CREATE TABLE %s.%s PARTITION BY AS SELECT * from cp.`region.json`", TEMP_SCHEMA, newTblName); - errorMsgTestHelper(ctasQuery, - String.format("PARSE ERROR: Encountered \"AS\"")); + errorMsgTestHelper(ctasQuery,"PARSE ERROR: Encountered \"AS\""); } finally { FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), newTblName)); } @@ -238,6 +241,41 @@ public void ctasWithPartition() throws Exception { } } + @Test + public void testPartitionByForAllTypes() throws Exception { + final String location = "partitioned_tables_with_nulls"; + final String ctasQuery = "create table %s partition by (%s) as %s"; + final String tablePath = "%s.`%s/%s_%s`"; + + // key - new table suffix, value - data query + final Map variations = Maps.newHashMap(); + variations.put("required", "select * from cp.`parquet/alltypes_required.parquet`"); + variations.put("optional", "select * from cp.`parquet/alltypes_optional.parquet`"); + variations.put("nulls_only", "select * from cp.`parquet/alltypes_optional.parquet` where %s is null"); + + try { + final QueryDataBatch result = testSqlWithResults("select * from cp.`parquet/alltypes_required.parquet` limit 0").get(0); + for (UserBitShared.SerializedField field : result.getHeader().getDef().getFieldList()) { + final String fieldName = field.getNamePart().getName(); + + for (Map.Entry variation : variations.entrySet()) { + final String table = String.format(tablePath, TEMP_SCHEMA, location, fieldName, variation.getKey()); + final String dataQuery = String.format(variation.getValue(), fieldName); + test(ctasQuery, table, fieldName, dataQuery, fieldName); + testBuilder() + .sqlQuery("select * from %s", table) + .unOrdered() + .sqlBaselineQuery(dataQuery) + .build() + .run(); + } + } + result.release(); + } finally { + FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), location)); + } + } + private static void ctasErrorTestHelper(final String ctasSql, final String expErrorMsg) throws Exception { final String createTableSql = String.format(ctasSql, TEMP_SCHEMA, "testTableName"); errorMsgTestHelper(createTableSql, expErrorMsg); diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet b/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet new file mode 100644 index 00000000000..53f5fa19d2f Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet b/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet index 549e316bed2..efc6add0cb5 100644 Binary files a/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet and b/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet differ