From 1e0be3100b463eeebec4136ba223adb42ad89253 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Mon, 23 May 2016 16:49:52 +0200 Subject: [PATCH] [FLINK-3955] [tableAPI] Rename Table.toSink() to Table.writeToSink(). --- docs/apis/table.md | 16 ++++++++-------- .../flink/api/table/BatchTableEnvironment.scala | 10 +++++----- .../flink/api/table/StreamTableEnvironment.scala | 10 +++++----- .../flink/api/table/TableEnvironment.scala | 8 ++++---- .../scala/org/apache/flink/api/table/table.scala | 10 +++++----- .../flink/api/scala/batch/TableSinkITCase.scala | 2 +- .../flink/api/scala/stream/TableSinkITCase.scala | 2 +- 7 files changed, 29 insertions(+), 29 deletions(-) diff --git a/docs/apis/table.md b/docs/apis/table.md index 276341db154df..f33ae5920e5a8 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -685,12 +685,12 @@ SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyw {% top %} -Emit a Table to external sinks +Write Tables to external sinks ---- -A `Table` can be emitted to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming table requires a `StreamTableSink` (a `TableSink` can implement both interfaces). +A `Table` can be written to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). A batch `Table` can only be written to a `BatchTableSink`, a streaming table requires a `StreamTableSink`. A `TableSink` can implement both interfaces at the same time. -Currently, Flink only provides a `CsvTableSink` that writes a batch or streaming `Table` to CSV-formatted files. A custom `TableSource` can be defined by implementing the `BatchTableSink` and/or `StreamTableSink` interface. +Currently, Flink only provides a `CsvTableSink` that writes a batch or streaming `Table` to CSV-formatted files. A custom `TableSink` can be defined by implementing the `BatchTableSink` and/or `StreamTableSink` interface.
@@ -703,8 +703,8 @@ Table result = ... // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); -// add a TableSink to emit the result Table -result.toSink(sink); +// write the result Table to the TableSink +result.writeToSink(sink); // execute the program env.execute(); @@ -721,8 +721,8 @@ val result: Table = ... // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") -// add a TableSink to emit the result Table -result.toSink(sink) +// write the result Table to the TableSink +result.writeToSink(sink) // execute the program env.execute() @@ -737,5 +737,5 @@ Runtime Configuration The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion. ### Null Handling -By default, the Table API supports `null` values. Null handling can be disabled by setting the `nullCheck` property in the `TableConfig` to `false`. +By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index b25c940fc6097..4c8b37091278e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -136,16 +136,16 @@ abstract class BatchTableEnvironment( } /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the - * [[TableSink]] to emit it. + * [[TableSink]] to write it. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. */ - override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { case batchSink: BatchTableSink[T] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 8ba30002f72e3..bacb587fa2052 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -137,16 +137,16 @@ abstract class StreamTableEnvironment( } /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the - * [[TableSink]] to emit it. + * [[TableSink]] to write it. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ - override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { case streamSink: StreamTableSink[T] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 1c592f92e2184..7debb65821352 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -147,13 +147,13 @@ abstract class TableEnvironment(val config: TableConfig) { def sql(query: String): Table /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The data type that the [[TableSink]] expects. */ - private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit + private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit /** * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 4f111c9ac3529..1e558c5448c2d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -315,16 +315,16 @@ class Table( } /** - * Emits the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. + * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. * - * A batch [[Table]] can only be emitted by a + * A batch [[Table]] can only be written to a * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming [[Table]] requires a * [[org.apache.flink.api.table.sinks.StreamTableSink]]. * - * @param sink The [[TableSink]] to which the [[Table]] is emitted. + * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. */ - def toSink[T](sink: TableSink[T]): Unit = { + def writeToSink[T](sink: TableSink[T]): Unit = { // get schema information of table val rowType = getRelNode.getRowType @@ -336,7 +336,7 @@ class Table( val configuredSink = sink.configure(fieldNames, fieldTypes) // emit the table to the configured table sink - tableEnv.emitToSink(this, configuredSink) + tableEnv.writeToSink(this, configuredSink) } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala index 39684ff914feb..dd0668c5108ed 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -59,7 +59,7 @@ class TableSinkITCase( val results = input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) - .toSink(new CsvTableSink(path, fieldDelim = "|")) + .writeToSink(new CsvTableSink(path, fieldDelim = "|")) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala index 66cb9bf407899..160d88a5fea1a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala @@ -53,7 +53,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { val results = input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) - .toSink(new CsvTableSink(path)) + .writeToSink(new CsvTableSink(path)) env.execute()