Skip to content

Commit

Permalink
DRILL-3993: Changes to support Calcite 1.15.
Browse files Browse the repository at this point in the history
Fix AssertionError: type mismatch for tests with aggregate functions.
Fix VARIANCE agg function
Remove using deprecated Subtype enum
Fix 'Failure while loading table a in database hbase' error
Fix 'Field ordinal 1 is invalid for  type '(DrillRecordRow[*])'' unit test failures
  • Loading branch information
vvysotskyi committed Jan 16, 2018
1 parent 9274cb9 commit d59f0cd
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 94 deletions.
Expand Up @@ -72,7 +72,16 @@ public Set<String> getSubSchemaNames() {
@Override
public Table getTable(String name) {
HBaseScanSpec scanSpec = new HBaseScanSpec(name);
return new DrillHBaseTable(schemaName, plugin, scanSpec);
try {
return new DrillHBaseTable(schemaName, plugin, scanSpec);
} catch (Exception e) {
// Calcite firstly is looking for a table in the default schema, if a table was not found,
// it is looking in root schema.
// If a table does not exist, a query will fail at validation stage,
// so the error should not be thrown there.
logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
return null;
}
}

@Override
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,14 +17,16 @@
*/
package org.apache.drill.exec.store.hive.schema;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
Expand All @@ -37,15 +39,15 @@
import java.util.Set;

public class HiveDatabaseSchema extends AbstractSchema{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDatabaseSchema.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDatabaseSchema.class);

private final HiveSchema hiveSchema;
private Set<String> tables;
private final DrillHiveMetaStoreClient mClient;
private final SchemaConfig schemaConfig;

public HiveDatabaseSchema( //
HiveSchema hiveSchema, //
public HiveDatabaseSchema(
HiveSchema hiveSchema,
String name,
DrillHiveMetaStoreClient mClient,
SchemaConfig schemaConfig) {
Expand Down Expand Up @@ -126,6 +128,17 @@ public Statistic getStatistic() {
public Schema.TableType getJdbcTableType() {
return tableType;
}

@Override
public boolean rolledUpColumnValidInsideAgg(String column,
SqlCall call, SqlNode parent, CalciteConnectionConfig config) {
return true;
}

@Override
public boolean isRolledUp(String column) {
return false;
}
}

}
Expand Up @@ -61,7 +61,16 @@ class OpenTSDBSchema extends AbstractSchema {
@Override
public Table getTable(String name) {
OpenTSDBScanSpec scanSpec = new OpenTSDBScanSpec(name);
try {
return new DrillOpenTSDBTable(schemaName, plugin, new Schema(plugin.getClient(), name), scanSpec);
} catch (Exception e) {
// Calcite firstly is looking for a table in the default schema, if a table was not found,
// it is looking in root schema.
// If a table does not exist, a query will fail at validation stage,
// so the error should not be thrown there.
logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
return null;
}
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions exec/java-exec/src/main/codegen/includes/parserImpls.ftl
Expand Up @@ -154,19 +154,19 @@ SqlNodeList ParseOptionalFieldList(String relType) :
/** Parses a required field list and makes sure no field is a "*". */
SqlNodeList ParseRequiredFieldList(String relType) :
{
SqlNodeList fieldList;
Pair<SqlNodeList, SqlNodeList> fieldList;
}
{
<LPAREN>
fieldList = ParenthesizedCompoundIdentifierList()
<RPAREN>
{
for(SqlNode node : fieldList)
for(SqlNode node : fieldList.left)
{
if (((SqlIdentifier)node).isStar())
if (((SqlIdentifier) node).isStar())
throw new ParseException(String.format("%s's field list has a '*', which is invalid.", relType));
}
return fieldList;
return fieldList.left;
}
}

Expand Down Expand Up @@ -357,7 +357,7 @@ SqlNode SqlDropFunction() :
/**
* Parses a comma-separated list of simple identifiers.
*/
SqlNodeList ParenthesizedCompoundIdentifierList() :
Pair<SqlNodeList, SqlNodeList> ParenthesizedCompoundIdentifierList() :
{
List<SqlIdentifier> list = new ArrayList<SqlIdentifier>();
SqlIdentifier id;
Expand All @@ -367,7 +367,7 @@ SqlNodeList ParenthesizedCompoundIdentifierList() :
(
<COMMA> id = SimpleIdentifier() {list.add(id);}) *
{
return new SqlNodeList(list, getPos());
return Pair.of(new SqlNodeList(list, getPos()), null);
}
}
</#if>
Expand Up @@ -32,12 +32,14 @@
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.fun.SqlCountAggFunction;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.trace.CalciteTrace;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillCalciteSqlAggFunctionWrapper;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;
Expand Down Expand Up @@ -234,7 +236,7 @@ private RexNode reduceAgg(
return reduceSum(oldAggRel, oldCall, newCalls, aggCallMapping);
}
if (sqlAggFunction instanceof SqlAvgAggFunction) {
final SqlAvgAggFunction.Subtype subtype = ((SqlAvgAggFunction) sqlAggFunction).getSubtype();
final SqlKind subtype = sqlAggFunction.getKind();
switch (subtype) {
case AVG:
// replace original AVG(x) with SUM(x) / COUNT(x)
Expand Down Expand Up @@ -314,14 +316,21 @@ private RexNode reduceAvg(
oldAggRel.getInput(),
iAvgInput);
RelDataType sumType =
TypeInferenceUtils.getDrillSqlReturnTypeInference(SqlKind.SUM.name(),
ImmutableList.<DrillFuncHolder>of())
.inferReturnType(oldCall.createBinding(oldAggRel));
sumType =
typeFactory.createTypeWithNullability(
avgInputType,
avgInputType.isNullable() || nGroups == 0);
SqlAggFunction sumAgg = new SqlSumEmptyIsZeroAggFunction();
AggregateCall sumCall = AggregateCall.create(sumAgg, oldCall.isDistinct(), oldCall.getArgList(), -1, sumType, null);
sumType,
sumType.isNullable() || nGroups == 0);
SqlAggFunction sumAgg =
new DrillCalciteSqlAggFunctionWrapper(new SqlSumEmptyIsZeroAggFunction(), sumType);
AggregateCall sumCall = AggregateCall.create(sumAgg, oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.getArgList(), -1, sumType, null);
final SqlCountAggFunction countAgg = (SqlCountAggFunction) SqlStdOperatorTable.COUNT;
final RelDataType countType = countAgg.getReturnType(typeFactory);
AggregateCall countCall = AggregateCall.create(countAgg, oldCall.isDistinct(), oldCall.getArgList(), -1, countType, null);
AggregateCall countCall = AggregateCall.create(countAgg, oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.getArgList(), -1, countType, null);

RexNode tmpsumRef =
rexBuilder.addAggCall(
Expand Down Expand Up @@ -370,7 +379,7 @@ private RexNode reduceAvg(
newCalls,
aggCallMapping,
ImmutableList.of(avgInputType));
if(isInferenceEnabled) {
if (isInferenceEnabled) {
return rexBuilder.makeCall(
new DrillSqlOperator(
"divide",
Expand Down Expand Up @@ -408,20 +417,21 @@ private RexNode reduceSum(
arg);
final RelDataType sumType;
final SqlAggFunction sumZeroAgg;
if(isInferenceEnabled) {
if (isInferenceEnabled) {
sumType = oldCall.getType();
sumZeroAgg = new DrillCalciteSqlAggFunctionWrapper(
new SqlSumEmptyIsZeroAggFunction(), sumType);
} else {
sumType =
typeFactory.createTypeWithNullability(
argType, argType.isNullable());
sumZeroAgg = new SqlSumEmptyIsZeroAggFunction();
oldCall.getType(), argType.isNullable());
}
AggregateCall sumZeroCall =AggregateCall.create(sumZeroAgg, oldCall.isDistinct(), oldCall.getArgList(), -1, sumType, null);
sumZeroAgg = new DrillCalciteSqlAggFunctionWrapper(
new SqlSumEmptyIsZeroAggFunction(), sumType);
AggregateCall sumZeroCall = AggregateCall.create(sumZeroAgg, oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.getArgList(), -1, sumType, null);
final SqlCountAggFunction countAgg = (SqlCountAggFunction) SqlStdOperatorTable.COUNT;
final RelDataType countType = countAgg.getReturnType(typeFactory);
AggregateCall countCall = AggregateCall.create(countAgg, oldCall.isDistinct(), oldCall.getArgList(), -1, countType, null);
AggregateCall countCall = AggregateCall.create(countAgg, oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.getArgList(), -1, countType, null);
// NOTE: these references are with respect to the output
// of newAggRel
RexNode sumZeroRef =
Expand Down Expand Up @@ -495,14 +505,17 @@ private RexNode reduceStddev(
SqlStdOperatorTable.MULTIPLY, argRef, argRef);
final int argSquaredOrdinal = lookupOrAdd(inputExprs, argSquared);

final RelDataType sumType =
typeFactory.createTypeWithNullability(
argType,
true);
RelDataType sumType =
TypeInferenceUtils.getDrillSqlReturnTypeInference(SqlKind.SUM.name(),
ImmutableList.<DrillFuncHolder>of())
.inferReturnType(oldCall.createBinding(oldAggRel));
sumType = typeFactory.createTypeWithNullability(sumType, true);
final AggregateCall sumArgSquaredAggCall =
AggregateCall.create(
new SqlSumAggFunction(sumType),
new DrillCalciteSqlAggFunctionWrapper(
new SqlSumAggFunction(sumType), sumType),
oldCall.isDistinct(),
oldCall.isApproximate(),
ImmutableIntList.of(argSquaredOrdinal),
-1,
sumType,
Expand All @@ -518,8 +531,10 @@ private RexNode reduceStddev(

final AggregateCall sumArgAggCall =
AggregateCall.create(
new SqlSumAggFunction(sumType),
new DrillCalciteSqlAggFunctionWrapper(
new SqlSumAggFunction(sumType), sumType),
oldCall.isDistinct(),
oldCall.isApproximate(),
ImmutableIntList.of(argOrdinal),
-1,
sumType,
Expand All @@ -539,7 +554,8 @@ private RexNode reduceStddev(

final SqlCountAggFunction countAgg = (SqlCountAggFunction) SqlStdOperatorTable.COUNT;
final RelDataType countType = countAgg.getReturnType(typeFactory);
final AggregateCall countArgAggCall = AggregateCall.create(countAgg, oldCall.isDistinct(), oldCall.getArgList(), -1, countType, null);
final AggregateCall countArgAggCall = AggregateCall.create(countAgg, oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.getArgList(), -1, countType, null);
final RexNode countArg =
rexBuilder.addAggCall(
countArgAggCall,
Expand All @@ -566,7 +582,7 @@ private RexNode reduceStddev(
final RexLiteral one =
rexBuilder.makeExactLiteral(BigDecimal.ONE);
final RexNode nul =
rexBuilder.makeNullLiteral(countArg.getType().getSqlTypeName());
rexBuilder.makeNullLiteral(countArg.getType());
final RexNode countMinusOne =
rexBuilder.makeCall(
SqlStdOperatorTable.MINUS, countArg, one);
Expand All @@ -580,7 +596,7 @@ private RexNode reduceStddev(
}

final SqlOperator divide;
if(isInferenceEnabled) {
if (isInferenceEnabled) {
divide = new DrillSqlOperator(
"divide",
2,
Expand All @@ -603,7 +619,7 @@ private RexNode reduceStddev(
SqlStdOperatorTable.POWER, div, half);
}

if(isInferenceEnabled) {
if (isInferenceEnabled) {
return result;
} else {
/*
Expand Down Expand Up @@ -670,7 +686,7 @@ public DrillConvertSumToSumZero(RelOptRuleOperand operand) {
public boolean matches(RelOptRuleCall call) {
DrillAggregateRel oldAggRel = (DrillAggregateRel) call.rels[0];
for (AggregateCall aggregateCall : oldAggRel.getAggCallList()) {
if(isConversionToSumZeroNeeded(aggregateCall.getAggregation(), aggregateCall.getType())) {
if (isConversionToSumZeroNeeded(aggregateCall.getAggregation(), aggregateCall.getType())) {
return true;
}
}
Expand All @@ -684,7 +700,7 @@ public void onMatch(RelOptRuleCall call) {
final Map<AggregateCall, RexNode> aggCallMapping = Maps.newHashMap();
final List<AggregateCall> newAggregateCalls = Lists.newArrayList();
for (AggregateCall oldAggregateCall : oldAggRel.getAggCallList()) {
if(isConversionToSumZeroNeeded(oldAggregateCall.getAggregation(), oldAggregateCall.getType())) {
if (isConversionToSumZeroNeeded(oldAggregateCall.getAggregation(), oldAggregateCall.getType())) {
final RelDataType argType = oldAggregateCall.getType();
final RelDataType sumType = oldAggRel.getCluster().getTypeFactory()
.createTypeWithNullability(argType, argType.isNullable());
Expand All @@ -694,6 +710,7 @@ public void onMatch(RelOptRuleCall call) {
AggregateCall.create(
sumZeroAgg,
oldAggregateCall.isDistinct(),
oldAggregateCall.isApproximate(),
oldAggregateCall.getArgList(),
-1,
sumType,
Expand Down Expand Up @@ -733,9 +750,9 @@ public DrillConvertWindowSumToSumZero(RelOptRuleOperand operand) {
@Override
public boolean matches(RelOptRuleCall call) {
final DrillWindowRel oldWinRel = (DrillWindowRel) call.rels[0];
for(Window.Group group : oldWinRel.groups) {
for(Window.RexWinAggCall rexWinAggCall : group.aggCalls) {
if(isConversionToSumZeroNeeded(rexWinAggCall.getOperator(), rexWinAggCall.getType())) {
for (Window.Group group : oldWinRel.groups) {
for (Window.RexWinAggCall rexWinAggCall : group.aggCalls) {
if (isConversionToSumZeroNeeded(rexWinAggCall.getOperator(), rexWinAggCall.getType())) {
return true;
}
}
Expand All @@ -748,10 +765,10 @@ public void onMatch(RelOptRuleCall call) {
final DrillWindowRel oldWinRel = (DrillWindowRel) call.rels[0];
final ImmutableList.Builder<Window.Group> builder = ImmutableList.builder();

for(Window.Group group : oldWinRel.groups) {
for (Window.Group group : oldWinRel.groups) {
final List<Window.RexWinAggCall> aggCalls = Lists.newArrayList();
for(Window.RexWinAggCall rexWinAggCall : group.aggCalls) {
if(isConversionToSumZeroNeeded(rexWinAggCall.getOperator(), rexWinAggCall.getType())) {
for (Window.RexWinAggCall rexWinAggCall : group.aggCalls) {
if (isConversionToSumZeroNeeded(rexWinAggCall.getOperator(), rexWinAggCall.getType())) {
final RelDataType argType = rexWinAggCall.getType();
final RelDataType sumType = oldWinRel.getCluster().getTypeFactory()
.createTypeWithNullability(argType, argType.isNullable());
Expand Down Expand Up @@ -792,7 +809,7 @@ public void onMatch(RelOptRuleCall call) {

private static boolean isConversionToSumZeroNeeded(SqlOperator sqlOperator, RelDataType type) {
sqlOperator = DrillCalciteWrapperUtility.extractSqlOperatorFromWrapper(sqlOperator);
if(sqlOperator instanceof SqlSumAggFunction
if (sqlOperator instanceof SqlSumAggFunction
&& !type.isNullable()) {
// If SUM(x) is not nullable, the validator must have determined that
// nulls are impossible (because the group is never empty and x is never
Expand Down

0 comments on commit d59f0cd

Please sign in to comment.