Skip to content

Commit

Permalink
[FLINK-3728] [tableAPI] Improve error message and documentation for u…
Browse files Browse the repository at this point in the history
…nsupported SQL features.

This closes apache#2018
  • Loading branch information
fhueske committed May 24, 2016
1 parent c0b7453 commit 6091973
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 63 deletions.
Expand Up @@ -31,7 +31,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -252,15 +251,17 @@ abstract class BatchTableEnvironment(
}
catch {
case e: CannotPlanException =>
throw new PlanGenException(
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
"Please consider filing a bug report.", e)
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case t: TableException =>
throw new PlanGenException(
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
t.msg)
s"${t.msg}\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
throw a.getCause
}
Expand Down
Expand Up @@ -27,7 +27,6 @@ import org.apache.calcite.tools.Programs

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -255,10 +254,11 @@ abstract class StreamTableEnvironment(
}
catch {
case e: CannotPlanException =>
throw new PlanGenException(
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
"Please consider filing a bug report.", e)
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
}

dataStreamPlan match {
Expand Down
26 changes: 0 additions & 26 deletions main/scala/org/apache/flink/api/table/plan/PlanGenException.scala

This file was deleted.

Expand Up @@ -26,11 +26,10 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
import org.apache.calcite.sql.fun._
import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
import org.apache.flink.api.table.typeutils.TypeConverter
import TypeConverter._
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.{Row, TableConfig}
import org.apache.flink.api.table.{TableException, Row, TableConfig}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -96,7 +95,7 @@ object AggregateUtil {

if (groupingOffsetMapping.length != groupings.length ||
aggOffsetMapping.length != namedAggregates.length) {
throw new PlanGenException("Could not find output field in input data type " +
throw new TableException("Could not find output field in input data type " +
"or aggregate functions.")
}

Expand Down Expand Up @@ -138,11 +137,11 @@ object AggregateUtil {
if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
aggFieldIndexes(index) = 0
} else {
throw new PlanGenException("Aggregate fields should not be empty.")
throw new TableException("Aggregate fields should not be empty.")
}
} else {
if (argList.size() > 1) {
throw new PlanGenException("Currently, do not support aggregate on multi fields.")
throw new TableException("Currently, do not support aggregate on multi fields.")
}
aggFieldIndexes(index) = argList.get(0)
}
Expand All @@ -163,7 +162,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleSumAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
throw new TableException("Sum aggregate does no support type:" + sqlType)
}
}
case _: SqlAvgAggFunction => {
Expand All @@ -181,7 +180,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleAvgAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
throw new TableException("Avg aggregate does no support type:" + sqlType)
}
}
case sqlMinMaxFunction: SqlMinMaxAggFunction => {
Expand All @@ -200,7 +199,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleMinAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Min aggregate does no support type:" + sqlType)
throw new TableException("Min aggregate does no support type:" + sqlType)
}
} else {
sqlTypeName match {
Expand All @@ -217,14 +216,14 @@ object AggregateUtil {
case DOUBLE =>
new DoubleMaxAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Max aggregate does no support type:" + sqlType)
throw new TableException("Max aggregate does no support type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
aggregates(index) = new CountAggregate
case unSupported: SqlAggFunction =>
throw new PlanGenException("unsupported Function: " + unSupported.getName)
throw new TableException("unsupported Function: " + unSupported.getName)
}
setAggregateDataOffset(index)
}
Expand Down
Expand Up @@ -23,8 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.{Row, TableEnvironment}
import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
Expand Down Expand Up @@ -213,7 +212,7 @@ class AggregationsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testDistinctAggregate(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand All @@ -228,7 +227,7 @@ class AggregationsITCase(
tEnv.sql(sqlQuery).toDataSet[Row]
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testGroupedDistinctAggregate(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand All @@ -243,7 +242,7 @@ class AggregationsITCase(
tEnv.sql(sqlQuery).toDataSet[Row]
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testGroupingSetAggregate(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand Down
Expand Up @@ -24,7 +24,6 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
Expand Down Expand Up @@ -190,7 +189,7 @@ class JoinITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testJoinNoEqualityPredicate(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand Down Expand Up @@ -246,7 +245,7 @@ class JoinITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testFullOuterJoin(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand All @@ -262,7 +261,7 @@ class JoinITCase(
tEnv.sql(sqlQuery).toDataSet[Row].collect()
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testLeftOuterJoin(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand All @@ -278,7 +277,7 @@ class JoinITCase(
tEnv.sql(sqlQuery).toDataSet[Row].collect()
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testRightOuterJoin(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand Down
Expand Up @@ -25,8 +25,7 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala._
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.{Row, TableEnvironment}
import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
Expand Down Expand Up @@ -61,7 +60,7 @@ class SortITCase(
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testOrderByOffset(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -73,7 +72,7 @@ class SortITCase(
tEnv.sql(sqlQuery).toDataSet[Row]
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testOrderByFirst(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand Down
Expand Up @@ -21,8 +21,7 @@ package org.apache.flink.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment}
import org.apache.flink.api.table.{TableException, ValidationException, Row, TableEnvironment}
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
Expand Down Expand Up @@ -140,7 +139,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
.select('c, 'g)
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testNoEqualityJoinPredicate1(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
Expand All @@ -154,7 +153,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
.select('c, 'g).collect()
}

@Test(expected = classOf[PlanGenException])
@Test(expected = classOf[TableException])
def testNoEqualityJoinPredicate2(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
Expand Down

0 comments on commit 6091973

Please sign in to comment.