Skip to content
Permalink
Browse files
DRILL-6375 : Support for ANY_VALUE aggregate function
closes #1256
  • Loading branch information
Gautam Parai authored and sohami committed Jun 7, 2018
1 parent cd219ff commit a27a1047b16621f6c3c6c181c97f8713231f6c6c
Showing 37 changed files with 1,265 additions and 49 deletions.
@@ -130,7 +130,7 @@ public int getParamCount() {
* @return workspace variables
*/
@Override
public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables){
public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference){
JVar[] workspaceJVars = new JVar[5];

workspaceJVars[0] = g.declareClassField("returnOI", g.getModel()._ref(ObjectInspector.class));
@@ -88,6 +88,52 @@
{inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
{inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"}
]
},
{className: "AnyValue", funcName: "any_value", types: [
{inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"},
{inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "Numeric"},
{inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"},
{inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"},
{inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "Numeric"},
{inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"},
{inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"},
{inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"},
{inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"},
{inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"},
{inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"},
{inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"},
{inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"},
{inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"},
{inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"},
{inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"},
{inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"},
{inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"},
{inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"},
{inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"},
{inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
{inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
{inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""},
{inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""},
{inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"},
{inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"}
{inputType: "List", outputType: "List", runningType: "List", major: "Complex"}
{inputType: "Map", outputType: "Map", runningType: "Map", major: "Complex"}
{inputType: "RepeatedBit", outputType: "RepeatedBit", runningType: "RepeatedBit", major: "Complex"},
{inputType: "RepeatedInt", outputType: "RepeatedInt", runningType: "RepeatedInt", major: "Complex"},
{inputType: "RepeatedBigInt", outputType: "RepeatedBigInt", runningType: "RepeatedBigInt", major: "Complex"},
{inputType: "RepeatedFloat4", outputType: "RepeatedFloat4", runningType: "RepeatedFloat4", major: "Complex"},
{inputType: "RepeatedFloat8", outputType: "RepeatedFloat8", runningType: "RepeatedFloat8", major: "Complex"},
{inputType: "RepeatedDate", outputType: "RepeatedDate", runningType: "RepeatedDate", major: "Complex"},
{inputType: "RepeatedTimeStamp", outputType: "RepeatedTimeStamp", runningType: "RepeatedTimeStamp", major: "Complex"},
{inputType: "RepeatedTime", outputType: "RepeatedTime", runningType: "RepeatedTime", major: "Complex"},
{inputType: "RepeatedIntervalDay", outputType: "RepeatedIntervalDay", runningType: "RepeatedIntervalDay", major: "Complex"},
{inputType: "RepeatedIntervalYear", outputType: "RepeatedIntervalYear", runningType: "RepeatedIntervalYear", major: "Complex"},
{inputType: "RepeatedInterval", outputType: "RepeatedInterval", runningType: "RepeatedInterval", major: "Complex"},
{inputType: "RepeatedVarChar", outputType: "RepeatedVarChar", runningType: "RepeatedVarChar", major: "Complex"},
{inputType: "RepeatedVarBinary", outputType: "RepeatedVarBinary", runningType: "RepeatedVarBinary", major: "Complex"},
{inputType: "RepeatedList", outputType: "RepeatedList", runningType: "RepeatedList", major: "Complex"},
{inputType: "RepeatedMap", outputType: "RepeatedMap", runningType: "RepeatedMap", major: "Complex"}
]
}
]
}
@@ -35,6 +35,12 @@
{inputType: "VarDecimal", outputType: "NullableVarDecimal"},
{inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"}
]
},
{className: "AnyValue", funcName: "any_value", types: [
{inputType: "VarDecimal", outputType: "NullableVarDecimal"},
{inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"}
{inputType: "RepeatedVarDecimal", outputType: "RepeatedVarDecimal"}
]
}
]
}
@@ -61,11 +61,11 @@ public void setup() {
value = new ${type.runningType}Holder();
nonNullCount = new BigIntHolder();
nonNullCount.value = 0;
<#if aggrtype.funcName == "sum">
<#if aggrtype.funcName == "sum" || aggrtype.funcName == "any_value">
value.value = 0;
<#elseif aggrtype.funcName == "min">
<#if type.runningType?starts_with("Bit")>
value.value = 1;
value.value = 1;
<#elseif type.runningType?starts_with("Int")>
value.value = Integer.MAX_VALUE;
<#elseif type.runningType?starts_with("BigInt")>
@@ -77,7 +77,7 @@ public void setup() {
</#if>
<#elseif aggrtype.funcName == "max">
<#if type.runningType?starts_with("Bit")>
value.value = 0;
value.value = 0;
<#elseif type.runningType?starts_with("Int")>
value.value = Integer.MIN_VALUE;
<#elseif type.runningType?starts_with("BigInt")>
@@ -110,19 +110,21 @@ public void add() {
value.value = Float.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
}
<#elseif type.inputType?contains("Float8")>
if(!Double.isNaN(in.value)) {
value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
}
<#else>
if(!Double.isNaN(in.value)) {
value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
}
<#else>
value.value = Math.min(value.value, in.value);
</#if>
</#if>
<#elseif aggrtype.funcName == "max">
value.value = Math.max(value.value, in.value);
<#elseif aggrtype.funcName == "sum">
value.value += in.value;
<#elseif aggrtype.funcName == "count">
value.value++;
<#else>
<#elseif aggrtype.funcName == "any_value">
value.value = in.value;
<#else>
// TODO: throw an error ?
</#if>
<#if type.inputType?starts_with("Nullable")>
@@ -143,7 +145,7 @@ public void output() {
@Override
public void reset() {
nonNullCount.value = 0;
<#if aggrtype.funcName == "sum" || aggrtype.funcName == "count">
<#if aggrtype.funcName == "sum" || aggrtype.funcName == "count" || aggrtype.funcName == "any_value">
value.value = 0;
<#elseif aggrtype.funcName == "min">
<#if type.runningType?starts_with("Int")>
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<@pp.dropOutputFile />



<#list aggrtypes1.aggrtypes as aggrtype>
<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/${aggrtype.className}ComplexFunctions.java" />

<#include "/@includes/license.ftl" />

/*
* This class is generated using freemarker and the ${.template_name} template.
*/

<#-- A utility class that is used to generate java code for aggr functions that maintain a single -->
<#-- running counter to hold the result. This includes: ANY_VALUE. -->

package org.apache.drill.exec.expr.fn.impl.gaggr;

import org.apache.drill.exec.expr.DrillAggFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.MapUtility;
import org.apache.drill.exec.vector.complex.writer.*;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.*;

@SuppressWarnings("unused")

public class ${aggrtype.className}ComplexFunctions {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${aggrtype.className}ComplexFunctions.class);

<#list aggrtype.types as type>
<#if type.major == "Complex">

@FunctionTemplate(name = "${aggrtype.funcName}", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc{
@Param ${type.inputType}Holder inHolder;
@Workspace BigIntHolder nonNullCount;
@Output ComplexWriter writer;

public void setup() {
nonNullCount = new BigIntHolder();
nonNullCount.value = 0;
}

@Override
public void add() {
<#if type.inputType?starts_with("Nullable")>
sout: {
if (inHolder.isSet == 0) {
// processing nullable input and the value is null, so don't do anything...
break sout;
}
</#if>
<#if aggrtype.funcName == "any_value">
<#if type.runningType?starts_with("Map")>
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createMap(inHolder.reader, writer, "any_value");
}
<#elseif type.runningType?starts_with("RepeatedMap")>
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value");
}
<#elseif type.runningType?starts_with("List")>
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value");
}
<#elseif type.runningType?starts_with("RepeatedList")>
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value");
}
<#elseif type.runningType?starts_with("Repeated")>
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value");
}
</#if>
</#if>
nonNullCount.value = 1;
<#if type.inputType?starts_with("Nullable")>
} // end of sout block
</#if>
}

@Override
public void output() {
//Do nothing since the complex writer takes care of everything!
}

@Override
public void reset() {
<#if aggrtype.funcName == "any_value">
nonNullCount.value = 0;
</#if>
}
}
</#if>
</#list>
}
</#list>
@@ -131,7 +131,16 @@ public void add() {
</#if>
<#elseif aggrtype.funcName == "count">
value.value++;
<#else>
<#elseif aggrtype.funcName == "any_value">
<#if type.outputType?ends_with("Interval")>
value.days = in.days;
value.months = in.months;
value.milliseconds = in.milliseconds;
<#elseif type.outputType?ends_with("IntervalDay")>
value.days = in.days;
value.milliseconds = in.milliseconds;
</#if>
<#else>
// TODO: throw an error ?
</#if>
<#if type.inputType?starts_with("Nullable")>
@@ -39,6 +39,8 @@
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.vector.complex.writer.*;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.*;
import javax.inject.Inject;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.holders.*;
@@ -124,6 +126,101 @@ public void reset() {
nonNullCount.value = 0;
}
}
<#elseif aggrtype.funcName.contains("any_value") && type.inputType?starts_with("Repeated")>
@FunctionTemplate(name = "${aggrtype.funcName}",
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE)
public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc {
@Param ${type.inputType}Holder in;
@Output ComplexWriter writer;
@Workspace BigIntHolder nonNullCount;

public void setup() {
nonNullCount = new BigIntHolder();
}

@Override
public void add() {
if (nonNullCount.value == 0) {
org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(in.reader, writer, "any_value");
}
nonNullCount.value = 1;
}

@Override
public void output() {
}

@Override
public void reset() {
nonNullCount.value = 0;
}
}
<#elseif aggrtype.funcName.contains("any_value")>
@FunctionTemplate(name = "${aggrtype.funcName}",
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE)
public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc {
@Param ${type.inputType}Holder in;
@Inject DrillBuf buffer;
@Workspace ObjectHolder value;
@Workspace IntHolder scale;
@Workspace IntHolder precision;
@Output ${type.outputType}Holder out;
@Workspace BigIntHolder nonNullCount;

public void setup() {
value = new ObjectHolder();
value.obj = java.math.BigDecimal.ZERO;
nonNullCount = new BigIntHolder();
}

@Override
public void add() {
<#if type.inputType?starts_with("Nullable")>
sout: {
if (in.isSet == 0) {
// processing nullable input and the value is null, so don't do anything...
break sout;
}
</#if>
if (nonNullCount.value == 0) {
value.obj=org.apache.drill.exec.util.DecimalUtility
.getBigDecimalFromDrillBuf(in.buffer,in.start,in.end-in.start,in.scale);
scale.value = in.scale;
precision.value = in.precision;
}
nonNullCount.value = 1;
<#if type.inputType?starts_with("Nullable")>
} // end of sout block
</#if>
}

@Override
public void output() {
if (nonNullCount.value > 0) {
out.isSet = 1;
byte[] bytes = ((java.math.BigDecimal)value.obj).unscaledValue().toByteArray();
int len = bytes.length;
out.start = 0;
out.buffer = buffer.reallocIfNeeded(len);
out.buffer.setBytes(0, bytes);
out.end = len;
out.scale = scale.value;
out.precision = precision.value;
} else {
out.isSet = 0;
}
}

@Override
public void reset() {
scale.value = 0;
precision.value = 0;
value.obj = null;
nonNullCount.value = 0;
}
}
<#elseif aggrtype.funcName == "max" || aggrtype.funcName == "min">

@FunctionTemplate(name = "${aggrtype.funcName}",

0 comments on commit a27a104

Please sign in to comment.