Skip to content

Commit

Permalink
DRILL-4372: Expose the functions return type to Drill
Browse files Browse the repository at this point in the history
- Drill-Calite version update:
This commit needs to have Calcite's patch (CALCITE-1062) to plugin customized SqlOperator.

- FunctionTemplate
Add FunctionArgumentNumber annotation. This annotation element tells if the number of argument(s) is fixed or arbitrary (e.g., String concatenation function).

Due to this modification, there are some minor changes in DrillFuncHolder, DrillFunctionRegistry and FunctionAttributes.

- Checker
Add a new Checker (which Calcite uses to validate the legitimacy of the number of argument(s) for a function) to allow functions with arbitrary arguments to pass Caclite's validation

- Type conversion between Drill and Calcite
DrillConstExector is given a static method getDrillTypeFromCalcite() to convert Calcite types to Drill's.

- Extract function's return type inference
Unlike other functions, Extract function's return type can be determined solely based on the first argument. A logic is added in to allow this inference to happen

- DrillCalcite wrapper:
From the aspects of return type inference and argument type checks, Calcite's mechanism is very different from Drill's. In addition, currently, there is no straightforward way for Drill to plug-in customized mechanisms to Calcite. Thus, wrappers are provided to serve the objective.

Except for the mechanisms of type inference and argument type checks, these wrappers just forward any method calls to the wrapped SqlOpertor, SqlFuncion or SqlAggFunction to respond.

A interface DrillCalciteSqlWrapper is also added for the callers of the three wrappers to get the wrapped objects easier.

Due to these wrappers, UnsupportedOperatorsVisitor is modified in a minor manner.

- Calcite's SqlOpertor, SqlFuncion or SqlAggFunction are wrapped in DrillOperatorTable
Instead of returning Caclite's native SqlOpertor, SqlFuncion or SqlAggFunction, return the wrapped ones to ensure customized behaviors can be adopted.

- Type inference mechanism
This mechanism is used across all SqlOpertor, SqlFuncion or SqlAggFunction. Thus, it is factored out as its own method in TypeInferenceUtils

- Upgrade Drill-Calcite

Bump version number to 1.4.0-drill-test-r16

- Implement two argument version of lpad, rpad

- Implement one argument version of ltrim, rtrim, btrim
  • Loading branch information
hsuanyi committed Mar 17, 2016
1 parent 245da97 commit c029335
Show file tree
Hide file tree
Showing 38 changed files with 2,424 additions and 328 deletions.
Expand Up @@ -19,10 +19,8 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
Expand Down Expand Up @@ -264,27 +262,31 @@ public boolean isFieldReader(int i) {
return this.parameters[i].isFieldReader;
}

public MajorType getReturnType(List<LogicalExpression> args) {
public MajorType getReturnType(final List<LogicalExpression> logicalExpressions) {
if (returnValue.type.getMinorType() == MinorType.UNION) {
Set<MinorType> subTypes = Sets.newHashSet();
for (ValueReference ref : parameters) {
final Set<MinorType> subTypes = Sets.newHashSet();
for(final ValueReference ref : parameters) {
subTypes.add(ref.getType().getMinorType());
}
MajorType.Builder builder = MajorType.newBuilder().setMinorType(MinorType.UNION).setMode(DataMode.OPTIONAL);
for (MinorType subType : subTypes) {

final MajorType.Builder builder = MajorType.newBuilder()
.setMinorType(MinorType.UNION)
.setMode(DataMode.OPTIONAL);

for(final MinorType subType : subTypes) {
builder.addSubType(subType);
}
return builder.build();
}
if (nullHandling == NullHandling.NULL_IF_NULL) {

if(nullHandling == NullHandling.NULL_IF_NULL) {
// if any one of the input types is nullable, then return nullable return type
for (LogicalExpression e : args) {
if (e.getMajorType().getMode() == TypeProtos.DataMode.OPTIONAL) {
for(final LogicalExpression logicalExpression : logicalExpressions) {
if(logicalExpression.getMajorType().getMode() == TypeProtos.DataMode.OPTIONAL) {
return Types.optional(returnValue.type.getMinorType());
}
}
}

return returnValue.type;
}

Expand Down Expand Up @@ -405,7 +407,6 @@ public Class<?> getType() {
public String getName() {
return name;
}

}

public boolean checkPrecisionRange() {
Expand All @@ -419,5 +420,4 @@ public MajorType getReturnType() {
public ValueReference getReturnValue() {
return returnValue;
}

}
Expand Up @@ -17,40 +17,61 @@
*/
package org.apache.drill.exec.expr.fn;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.calcite.sql.SqlOperator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.expr.DrillFunc;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.planner.logical.DrillConstExecutor;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;

/**
* Registry of Drill functions.
*/
public class DrillFunctionRegistry {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFunctionRegistry.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFunctionRegistry.class);

// key: function name (lowercase) value: list of functions with that name
private final ArrayListMultimap<String, DrillFuncHolder> registeredFunctions = ArrayListMultimap.create();

private ArrayListMultimap<String, DrillFuncHolder> methods = ArrayListMultimap.create();
private static final ImmutableMap<String, Pair<Integer, Integer>> drillFuncToRange = ImmutableMap.<String, Pair<Integer, Integer>> builder()
// CONCAT is allowed to take [1, infinity) number of arguments.
// Currently, this flexibility is offered by DrillOptiq to rewrite it as
// a nested structure
.put("CONCAT", Pair.of(1, Integer.MAX_VALUE))

/* Hash map to prevent registering functions with exactly matching signatures
* key: Function Name + Input's Major Type
* Value: Class name where function is implemented
*/
private HashMap<String, String> functionSignatureMap = new HashMap<>();
// When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as
// another function based on the second argument (encodingType)
.put("LENGTH", Pair.of(1, 2))

// Dummy functions
.put("CONVERT_TO", Pair.of(2, 2))
.put("CONVERT_FROM", Pair.of(2, 2))
.put("FLATTEN", Pair.of(1, 1)).build();

public DrillFunctionRegistry(ScanResult classpathScan) {
FunctionConverter converter = new FunctionConverter();
List<AnnotatedClassDescriptor> providerClasses = classpathScan.getAnnotatedClasses();

// Hash map to prevent registering functions with exactly matching signatures
// key: Function Name + Input's Major Type
// value: Class name where function is implemented
//
final Map<String, String> functionSignatureMap = new HashMap<>();
for (AnnotatedClassDescriptor func : providerClasses) {
DrillFuncHolder holder = converter.getHolder(func);
if (holder != null) {
Expand All @@ -64,7 +85,7 @@ public DrillFunctionRegistry(ScanResult classpathScan) {
}
for (String name : names) {
String functionName = name.toLowerCase();
methods.put(functionName, holder);
registeredFunctions.put(functionName, holder);
String functionSignature = functionName + functionInput;
String existingImplementation;
if ((existingImplementation = functionSignatureMap.get(functionSignature)) != null) {
Expand All @@ -84,46 +105,62 @@ public DrillFunctionRegistry(ScanResult classpathScan) {
}
if (logger.isTraceEnabled()) {
StringBuilder allFunctions = new StringBuilder();
for (DrillFuncHolder method: methods.values()) {
for (DrillFuncHolder method: registeredFunctions.values()) {
allFunctions.append(method.toString()).append("\n");
}
logger.trace("Registered functions: [\n{}]", allFunctions);
}
}

public int size(){
return methods.size();
return registeredFunctions.size();
}

/** Returns functions with given name. Function name is case insensitive. */
public List<DrillFuncHolder> getMethods(String name) {
return this.methods.get(name.toLowerCase());
return this.registeredFunctions.get(name.toLowerCase());
}

public void register(DrillOperatorTable operatorTable) {
SqlOperator op;
for (Entry<String, Collection<DrillFuncHolder>> function : methods.asMap().entrySet()) {
Set<Integer> argCounts = Sets.newHashSet();
String name = function.getKey().toUpperCase();
for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.asMap().entrySet()) {
final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create();
final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create();
final String name = function.getKey().toUpperCase();
boolean isDeterministic = true;
for (DrillFuncHolder func : function.getValue()) {
if (argCounts.add(func.getParamCount())) {
if (func.isAggregating()) {
op = new DrillSqlAggOperator(name, func.getParamCount());
final int paramCount = func.getParamCount();
if(func.isAggregating()) {
aggregateFunctions.put(paramCount, func);
} else {
final Pair<Integer, Integer> argNumberRange;
if(drillFuncToRange.containsKey(name)) {
argNumberRange = drillFuncToRange.get(name);
} else {
boolean isDeterministic;
// prevent Drill from folding constant functions with types that cannot be materialized
// into literals
if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) {
isDeterministic = false;
} else {
isDeterministic = func.isDeterministic();
}
op = new DrillSqlOperator(name, func.getParamCount(), func.getReturnType(), isDeterministic);
argNumberRange = Pair.of(func.getParamCount(), func.getParamCount());
}
operatorTable.add(function.getKey(), op);
functions.put(argNumberRange, func);
}

if(!func.isDeterministic()) {
isDeterministic = false;
}
}
for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) {
final DrillSqlOperator drillSqlOperator;
final Pair<Integer, Integer> range = entry.getKey();
final int max = range.getRight();
final int min = range.getLeft();
drillSqlOperator = new DrillSqlOperator(
name,
Lists.newArrayList(entry.getValue()),
min,
max,
isDeterministic);
operatorTable.add(name, drillSqlOperator);
}
for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) {
operatorTable.add(name, new DrillSqlAggOperator(name, Lists.newArrayList(entry.getValue()), entry.getKey()));
}
}
}

}
Expand Up @@ -19,6 +19,7 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,6 +41,11 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;

/**
* This class offers the registry for functions. Notably, in addition to Drill its functions
* (in {@link DrillFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry})
* is also registered in this class
*/
public class FunctionImplementationRegistry implements FunctionLookupContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);

Expand Down

0 comments on commit c029335

Please sign in to comment.