diff --git a/docs/apis/table.md b/docs/apis/table.md index 276341db154df..12b0e19b4b83a 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -31,7 +31,7 @@ under the License. The Table API is a SQL-like expression language for relational stream and batch processing that can be easily embedded in Flink's DataSet and DataStream APIs (Java and Scala). The Table API and SQL interface operate on a relational `Table` abstraction, which can be created from external data sources, or existing DataSets and DataStreams. With the Table API, you can apply relational operators such as selection, aggregation, and joins on `Table`s. -`Table`s can also be queried with regular SQL, as long as they are registered (see [Registering and Accessing Tables](#registering-and-accessing-tables)). The Table API and SQL offer equivalent functionality and can be mixed in the same program. When a `Table` is converted back into a `DataSet` or `DataStream`, the logical plan, which was defined by relational operators and SQL queries, is optimized using [Apache Calcite](https://calcite.apache.org/) and transformed into a `DataSet` or `DataStream` execution plan. +`Table`s can also be queried with regular SQL, as long as they are registered (see [Registering Tables](#registering-tables)). The Table API and SQL offer equivalent functionality and can be mixed in the same program. When a `Table` is converted back into a `DataSet` or `DataStream`, the logical plan, which was defined by relational operators and SQL queries, is optimized using [Apache Calcite](https://calcite.apache.org/) and transformed into a `DataSet` or `DataStream` program. * This will be replaced by the TOC {:toc} @@ -50,15 +50,15 @@ The following dependency must be added to your project in order to use the Table {% endhighlight %} -Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). +*Note: The Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).* Registering Tables -------------------------------- -`TableEnvironment`s have an internal table catalog to which tables can be registered with a unique name. After registration, a table can be accessed from the `TableEnvironment` by its name. Tables can be registered in different ways. +`TableEnvironment`s have an internal table catalog to which tables can be registered with a unique name. After registration, a table can be accessed from the `TableEnvironment` by its name. -*Note that it is not required to register a `DataSet` or `DataStream` as a table in a `TableEnvironment` in order to process it with the Table API.* +*Note: `DataSet`s or `DataStream`s can be directly converted into `Table`s without registering them in the `TableEnvironment`.* ### Register a DataSet @@ -92,7 +92,7 @@ tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount) -*Note: DataSet table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as these are reserved for internal use only.* +*Note: The name of a `DataSet` `Table` must not match the `^_DataSetTable_[0-9]+` pattern which is reserved for internal use only.* ### Register a DataStream @@ -126,7 +126,7 @@ tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount) -*Note: DataStream table names are not allowed to follow the `^_DataStreamTable_[0-9]+` pattern, as these are reserved for internal use only.* +*Note: The name of a `DataStream` `Table` must not match the `^_DataStreamTable_[0-9]+` pattern which is reserved for internal use only.* ### Register a Table @@ -214,7 +214,7 @@ The Table API provides methods to apply relational operations on DataSets and Da The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet` or `DataStream`, converted into a `DataSet` or `DataStream`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments. -*Note that the only operations currently supported on streaming Tables are selection, filtering, and union.* +*Note: The only operations currently supported on streaming Tables are selection, projection, and union.*
@@ -639,17 +639,13 @@ column names and function names follow Java identifier syntax. Expressions speci SQL ---- -Registered `Table`s can be directly queried with SQL and SQL queries can also be mixed with Table API expressions. Table API and SQL statements will be translated into a single optimized DataStream or DataSet program. +SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or emitted to a `TableSink` (see [Emitting Tables to External Sinks](#emitting-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. -*Note: The current SQL implementation is not feature complete. Outer joins, distinct aggregates, date and decimal data types are currently not supported. However, all operations supported by the Table API are also supported by SQL.* +A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). -In order to use a `Table`, `DataSet`, `DataStream`, or external `TableSource` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name. -A registered table can be accessed from a `TableEnvironment` using the `sql()` method of the `TableEnvironment`: - -- `tEnv.sql(SELECT * FROM tName)` executes the SQL query on the corresponding tables which were registered in a `TableEnvironment`. - -This method returns a new `Table` which can be converted back to a `DataSet`, or `DataStream`, or used in subsequent Table API queries. +*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* +### SQL on Batch Tables
@@ -658,11 +654,12 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // read a DataSet from an external source -DataSet> ds = env.readCsvFile(...); +DataSet> ds = env.readCsvFile(...); // register the DataSet as table "Orders" tableEnv.registerDataSet("Orders", ds, "user, product, amount"); -// run a SQL query and retrieve the result in a new Table -Table result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10"); +// run a SQL query on the Table and retrieve the result as a new Table +Table result = tableEnv.sql( + "SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'"); {% endhighlight %}
@@ -672,20 +669,75 @@ val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataSet from an external source -val ds = env.readCsvFile(...) +val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...) // register the DataSet under the name "Orders" tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount) -// run a SQL query and retrieve the result in a new Table -val result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10") +// run a SQL query on the Table and retrieve the result as a new Table +val result = tableEnv.sql( + "SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'") {% endhighlight %}
-SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyword before the table name. Please refer to the [Apache Calcite SQL Streaming documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax. +#### Limitations + +The current version supports selection (filter), projection, inner equi-joins, grouping, non-distinct aggregates, and sorting on batch tables. + +Among others, the following SQL features are not supported, yet: + +- Time data types (`DATE`, `TIME`, `TIMESTAMP`, `INTERVAL`) and `DECIMAL` types +- Distinct aggregates (e.g., `COUNT(DISTINCT name)`) +- Outer joins +- Non-equi joins and Cartesian products +- Result selection by order position (`ORDER BY OFFSET FETCH`) +- Grouping sets +- Set operations except `UNION ALL` (`INTERSECT`, `UNION`, `EXCEPT`) + +*Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products. Certain rewrites during optimization (e.g., subquery decorrelation) can result in unsupported operations such as outer joins.* + +### SQL on Streaming Tables + +SQL queries can be executed on streaming Tables (Tables backed by `DataStream` or `StreamTableSource`) by using the `SELECT STREAM` keywords instead of `SELECT`. Please refer to the [Apache Calcite's Streaming SQL documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax. + +
+
+{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream> ds = env.addSource(...); +// register the DataStream as table "Orders" +tableEnv.registerDataStream("Orders", ds, "user, product, amount"); +// run a SQL query on the Table and retrieve the result as a new Table +Table result = tableEnv.sql( + "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'"); +{% endhighlight %} +
+ +
+{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(Long, String, Integer)] = env.addSource(...) +// register the DataStream under the name "Orders" +tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) +// run a SQL query on the Table and retrieve the result as a new Table +val result = tableEnv.sql( + "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'") +{% endhighlight %} +
+
+ +#### Limitations + +The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not supported yet. {% top %} -Emit a Table to external sinks +Emitting Tables to External Sinks ---- A `Table` can be emitted to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming table requires a `StreamTableSink` (a `TableSink` can implement both interfaces). @@ -739,3 +791,4 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r ### Null Handling By default, the Table API supports `null` values. Null handling can be disabled by setting the `nullCheck` property in the `TableConfig` to `false`. +{% top %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 207500a723472..85ea5e88f8ca3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -31,7 +31,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets @@ -252,10 +251,11 @@ abstract class BatchTableEnvironment( } catch { case e: CannotPlanException => - throw new PlanGenException( + throw new TableException( s"Cannot generate a valid execution plan for the given query: \n\n" + s"${RelOptUtil.toString(relNode)}\n" + - "Please consider filing a bug report.", e) + s"This exception indicates that the query uses an unsupported SQL feature.\n" + + s"Please check the documentation for the set of currently supported SQL features.") case a: AssertionError => throw a.getCause } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 8ba30002f72e3..004f08ed19a03 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -27,7 +27,6 @@ import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets @@ -255,10 +254,11 @@ abstract class StreamTableEnvironment( } catch { case e: CannotPlanException => - throw new PlanGenException( + throw new TableException( s"Cannot generate a valid execution plan for the given query: \n\n" + s"${RelOptUtil.toString(relNode)}\n" + - "Please consider filing a bug report.", e) + s"This exception indicates that the query uses an unsupported SQL feature.\n" + + s"Please check the documentation for the set of currently supported SQL features.") } dataStreamPlan match { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala deleted file mode 100644 index 2fd400da35843..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan - -class PlanGenException(message: String, exception: Exception) extends - RuntimeException(message: String, exception: Exception){ - - def this(message: String){ - this(message, null) - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala index 77e896fce2c93..bdc662a4d2ee8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala @@ -26,11 +26,10 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName} import org.apache.calcite.sql.fun._ import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction} -import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo} +import org.apache.flink.api.table.typeutils.TypeConverter import TypeConverter._ import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, TableConfig} +import org.apache.flink.api.table.{TableException, Row, TableConfig} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer @@ -96,7 +95,7 @@ object AggregateUtil { if (groupingOffsetMapping.length != groupings.length || aggOffsetMapping.length != namedAggregates.length) { - throw new PlanGenException("Could not find output field in input data type " + + throw new TableException("Could not find output field in input data type " + "or aggregate functions.") } @@ -138,11 +137,11 @@ object AggregateUtil { if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { aggFieldIndexes(index) = 0 } else { - throw new PlanGenException("Aggregate fields should not be empty.") + throw new TableException("Aggregate fields should not be empty.") } } else { if (argList.size() > 1) { - throw new PlanGenException("Currently, do not support aggregate on multi fields.") + throw new TableException("Currently, do not support aggregate on multi fields.") } aggFieldIndexes(index) = argList.get(0) } @@ -163,7 +162,7 @@ object AggregateUtil { case DOUBLE => new DoubleSumAggregate case sqlType: SqlTypeName => - throw new PlanGenException("Sum aggregate does no support type:" + sqlType) + throw new TableException("Sum aggregate does no support type:" + sqlType) } } case _: SqlAvgAggFunction => { @@ -181,7 +180,7 @@ object AggregateUtil { case DOUBLE => new DoubleAvgAggregate case sqlType: SqlTypeName => - throw new PlanGenException("Avg aggregate does no support type:" + sqlType) + throw new TableException("Avg aggregate does no support type:" + sqlType) } } case sqlMinMaxFunction: SqlMinMaxAggFunction => { @@ -200,7 +199,7 @@ object AggregateUtil { case DOUBLE => new DoubleMinAggregate case sqlType: SqlTypeName => - throw new PlanGenException("Min aggregate does no support type:" + sqlType) + throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { @@ -217,14 +216,14 @@ object AggregateUtil { case DOUBLE => new DoubleMaxAggregate case sqlType: SqlTypeName => - throw new PlanGenException("Max aggregate does no support type:" + sqlType) + throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate case unSupported: SqlAggFunction => - throw new PlanGenException("unsupported Function: " + unSupported.getName) + throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala index d388c33d9d8d5..50a4bdedb1a9a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala @@ -24,7 +24,6 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.{Row, TableEnvironment, TableException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils @@ -190,7 +189,7 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testJoinNoEqualityPredicate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -246,7 +245,7 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testFullOuterJoin(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -262,7 +261,7 @@ class JoinITCase( tEnv.sql(sqlQuery).toDataSet[Row].collect() } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testLeftOuterJoin(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -278,7 +277,7 @@ class JoinITCase( tEnv.sql(sqlQuery).toDataSet[Row].collect() } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testRightOuterJoin(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala index ae76acecbf773..126be4b61253f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala @@ -21,8 +21,7 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment} +import org.apache.flink.api.table.{TableException, ValidationException, Row, TableEnvironment} import org.apache.flink.api.table.expressions.Literal import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} @@ -140,7 +139,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) .select('c, 'g) } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testNoEqualityJoinPredicate1(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -154,7 +153,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) .select('c, 'g).collect() } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[TableException]) def testNoEqualityJoinPredicate2(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)