Skip to content

Commit

Permalink
[FLINK-3728] [tableAPI] Improve error message and documentation for u…
Browse files Browse the repository at this point in the history
…nsupported SQL features.
  • Loading branch information
fhueske committed May 22, 2016
1 parent 8ed3685 commit bd8c4c9
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 75 deletions.
99 changes: 76 additions & 23 deletions docs/apis/table.md
Expand Up @@ -31,7 +31,7 @@ under the License.
The Table API is a SQL-like expression language for relational stream and batch processing that can be easily embedded in Flink's DataSet and DataStream APIs (Java and Scala).
The Table API and SQL interface operate on a relational `Table` abstraction, which can be created from external data sources, or existing DataSets and DataStreams. With the Table API, you can apply relational operators such as selection, aggregation, and joins on `Table`s.

`Table`s can also be queried with regular SQL, as long as they are registered (see [Registering and Accessing Tables](#registering-and-accessing-tables)). The Table API and SQL offer equivalent functionality and can be mixed in the same program. When a `Table` is converted back into a `DataSet` or `DataStream`, the logical plan, which was defined by relational operators and SQL queries, is optimized using [Apache Calcite](https://calcite.apache.org/) and transformed into a `DataSet` or `DataStream` execution plan.
`Table`s can also be queried with regular SQL, as long as they are registered (see [Registering Tables](#registering-tables)). The Table API and SQL offer equivalent functionality and can be mixed in the same program. When a `Table` is converted back into a `DataSet` or `DataStream`, the logical plan, which was defined by relational operators and SQL queries, is optimized using [Apache Calcite](https://calcite.apache.org/) and transformed into a `DataSet` or `DataStream` program.

* This will be replaced by the TOC
{:toc}
Expand All @@ -50,15 +50,15 @@ The following dependency must be added to your project in order to use the Table
</dependency>
{% endhighlight %}

Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
*Note: The Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).*


Registering Tables
--------------------------------

`TableEnvironment`s have an internal table catalog to which tables can be registered with a unique name. After registration, a table can be accessed from the `TableEnvironment` by its name. Tables can be registered in different ways.
`TableEnvironment`s have an internal table catalog to which tables can be registered with a unique name. After registration, a table can be accessed from the `TableEnvironment` by its name.

*Note that it is not required to register a `DataSet` or `DataStream` as a table in a `TableEnvironment` in order to process it with the Table API.*
*Note: `DataSet`s or `DataStream`s can be directly converted into `Table`s without registering them in the `TableEnvironment`.*

### Register a DataSet

Expand Down Expand Up @@ -92,7 +92,7 @@ tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)
</div>
</div>

*Note: DataSet table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as these are reserved for internal use only.*
*Note: The name of a `DataSet` `Table` must not match the `^_DataSetTable_[0-9]+` pattern which is reserved for internal use only.*

### Register a DataStream

Expand Down Expand Up @@ -126,7 +126,7 @@ tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)
</div>
</div>

*Note: DataStream table names are not allowed to follow the `^_DataStreamTable_[0-9]+` pattern, as these are reserved for internal use only.*
*Note: The name of a `DataStream` `Table` must not match the `^_DataStreamTable_[0-9]+` pattern which is reserved for internal use only.*

### Register a Table

Expand Down Expand Up @@ -214,7 +214,7 @@ The Table API provides methods to apply relational operations on DataSets and Da

The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet` or `DataStream`, converted into a `DataSet` or `DataStream`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments.

*Note that the only operations currently supported on streaming Tables are selection, filtering, and union.*
*Note: The only operations currently supported on streaming Tables are selection, projection, and union.*

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -639,17 +639,13 @@ column names and function names follow Java identifier syntax. Expressions speci

SQL
----
Registered `Table`s can be directly queried with SQL and SQL queries can also be mixed with Table API expressions. Table API and SQL statements will be translated into a single optimized DataStream or DataSet program.
SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or emitted to a `TableSink` (see [Emitting Tables to External Sinks](#emitting-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program.

*Note: The current SQL implementation is not feature complete. Outer joins, distinct aggregates, date and decimal data types are currently not supported. However, all operations supported by the Table API are also supported by SQL.*
A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)).

In order to use a `Table`, `DataSet`, `DataStream`, or external `TableSource` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
A registered table can be accessed from a `TableEnvironment` using the `sql()` method of the `TableEnvironment`:

- `tEnv.sql(SELECT * FROM tName)` executes the SQL query on the corresponding tables which were registered in a `TableEnvironment`.

This method returns a new `Table` which can be converted back to a `DataSet`, or `DataStream`, or used in subsequent Table API queries.
*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.*

### SQL on Batch Tables

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand All @@ -658,11 +654,12 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// read a DataSet from an external source
DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
DataSet<Tuple3<Long, String, Integer>> ds = env.readCsvFile(...);
// register the DataSet as table "Orders"
tableEnv.registerDataSet("Orders", ds, "user, product, amount");
// run a SQL query and retrieve the result in a new Table
Table result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
"SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'");
{% endhighlight %}
</div>

Expand All @@ -672,20 +669,75 @@ val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds = env.readCsvFile(...)
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)
// register the DataSet under the name "Orders"
tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount)
// run a SQL query and retrieve the result in a new Table
val result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10")
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
"SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
</div>

SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyword before the table name. Please refer to the [Apache Calcite SQL Streaming documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax.
#### Limitations

The current version supports selection (filter), projection, inner equi-joins, grouping, non-distinct aggregates, and sorting on batch tables.

Among others, the following SQL features are not supported, yet:

- Time data types (`DATE`, `TIME`, `TIMESTAMP`, `INTERVAL`) and `DECIMAL` types
- Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
- Outer joins
- Non-equi joins and Cartesian products
- Result selection by order position (`ORDER BY OFFSET FETCH`)
- Grouping sets
- Set operations except `UNION ALL` (`INTERSECT`, `UNION`, `EXCEPT`)

*Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products. Certain rewrites during optimization (e.g., subquery decorrelation) can result in unsupported operations such as outer joins.*

### SQL on Streaming Tables

SQL queries can be executed on streaming Tables (Tables backed by `DataStream` or `StreamTableSource`) by using the `SELECT STREAM` keywords instead of `SELECT`. Please refer to the [Apache Calcite's Streaming SQL documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
"SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
"SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
</div>

#### Limitations

The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not supported yet.

{% top %}

Emit a Table to external sinks
Emitting Tables to External Sinks
----

A `Table` can be emitted to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming table requires a `StreamTableSink` (a `TableSink` can implement both interfaces).
Expand Down Expand Up @@ -739,3 +791,4 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r
### Null Handling
By default, the Table API supports `null` values. Null handling can be disabled by setting the `nullCheck` property in the `TableConfig` to `false`.

{% top %}
Expand Up @@ -31,7 +31,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -252,10 +251,11 @@ abstract class BatchTableEnvironment(
}
catch {
case e: CannotPlanException =>
throw new PlanGenException(
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
"Please consider filing a bug report.", e)
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
throw a.getCause
}
Expand Down
Expand Up @@ -27,7 +27,6 @@ import org.apache.calcite.tools.Programs

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -255,10 +254,11 @@ abstract class StreamTableEnvironment(
}
catch {
case e: CannotPlanException =>
throw new PlanGenException(
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
"Please consider filing a bug report.", e)
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
}

dataStreamPlan match {
Expand Down

This file was deleted.

Expand Up @@ -26,11 +26,10 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
import org.apache.calcite.sql.fun._
import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
import org.apache.flink.api.table.typeutils.TypeConverter
import TypeConverter._
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.{Row, TableConfig}
import org.apache.flink.api.table.{TableException, Row, TableConfig}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -96,7 +95,7 @@ object AggregateUtil {

if (groupingOffsetMapping.length != groupings.length ||
aggOffsetMapping.length != namedAggregates.length) {
throw new PlanGenException("Could not find output field in input data type " +
throw new TableException("Could not find output field in input data type " +
"or aggregate functions.")
}

Expand Down Expand Up @@ -138,11 +137,11 @@ object AggregateUtil {
if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
aggFieldIndexes(index) = 0
} else {
throw new PlanGenException("Aggregate fields should not be empty.")
throw new TableException("Aggregate fields should not be empty.")
}
} else {
if (argList.size() > 1) {
throw new PlanGenException("Currently, do not support aggregate on multi fields.")
throw new TableException("Currently, do not support aggregate on multi fields.")
}
aggFieldIndexes(index) = argList.get(0)
}
Expand All @@ -163,7 +162,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleSumAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
throw new TableException("Sum aggregate does no support type:" + sqlType)
}
}
case _: SqlAvgAggFunction => {
Expand All @@ -181,7 +180,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleAvgAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
throw new TableException("Avg aggregate does no support type:" + sqlType)
}
}
case sqlMinMaxFunction: SqlMinMaxAggFunction => {
Expand All @@ -200,7 +199,7 @@ object AggregateUtil {
case DOUBLE =>
new DoubleMinAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Min aggregate does no support type:" + sqlType)
throw new TableException("Min aggregate does no support type:" + sqlType)
}
} else {
sqlTypeName match {
Expand All @@ -217,14 +216,14 @@ object AggregateUtil {
case DOUBLE =>
new DoubleMaxAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Max aggregate does no support type:" + sqlType)
throw new TableException("Max aggregate does no support type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
aggregates(index) = new CountAggregate
case unSupported: SqlAggFunction =>
throw new PlanGenException("unsupported Function: " + unSupported.getName)
throw new TableException("unsupported Function: " + unSupported.getName)
}
setAggregateDataOffset(index)
}
Expand Down

0 comments on commit bd8c4c9

Please sign in to comment.