From 63832055301a47dff10c9802b9500040457d9d75 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 17 May 2017 11:31:33 +0200 Subject: [PATCH 1/2] [FLINK-6543] [table] Deprecate toDataStream --- .../examples/scala/StreamSQLExample.scala | 2 +- .../examples/scala/StreamTableExample.scala | 2 +- .../api/java/StreamTableEnvironment.scala | 117 +++++++++++++++++- .../api/scala/StreamTableEnvironment.scala | 55 +++++++- .../table/api/scala/TableConversions.scala | 96 ++++++++++++-- .../flink/table/api/scala/package.scala | 2 +- .../api/scala/stream/TableSourceITCase.scala | 4 +- .../scala/stream/sql/OverWindowITCase.scala | 32 ++--- .../api/scala/stream/sql/SqlITCase.scala | 20 +-- .../api/scala/stream/table/CalcITCase.scala | 24 ++-- .../table/GroupWindowAggregationsITCase.scala | 10 +- .../scala/stream/table/OverWindowITCase.scala | 10 +- .../api/scala/stream/table/UnionITCase.scala | 8 +- .../DataStreamAggregateITCase.scala | 12 +- .../datastream/DataStreamCalcITCase.scala | 4 +- .../DataStreamUserDefinedFunctionITCase.scala | 12 +- .../datastream/TimeAttributesITCase.scala | 12 +- 17 files changed, 328 insertions(+), 94 deletions(-) diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 2cdd8b8971eb4..665913e04a124 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -62,7 +62,7 @@ object StreamSQLExample { "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") - result.toDataStream[Order].print() + result.toAppendStream[Order].print() env.execute() } diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala index 5c5012b57f102..1c0ffeaf44956 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala @@ -55,7 +55,7 @@ object StreamTableExample { val result: DataStream[Order] = orderA.unionAll(orderB) .select('user, 'product, 'amount) .where('amount > 2) - .toDataStream[Order] + .toAppendStream[Order] result.print() diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 311986c603012..a045793b33bd7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -133,6 +133,111 @@ class StreamTableEnvironment( registerDataStreamInternal(name, dataStream, exprs) } + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = + toAppendStream(table, typeInfo) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataStream]]. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T]( + table: Table, + clazz: Class[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T]( + table: Table, + typeInfo: TypeInformation[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig) + /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * @@ -149,8 +254,8 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - toDataStream(table, clazz, queryConfig) + def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = { + toAppendStream(table, clazz, queryConfig) } /** @@ -169,8 +274,8 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { - toDataStream(table, typeInfo, queryConfig) + def toAppendStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { + toAppendStream(table, typeInfo, queryConfig) } /** @@ -190,7 +295,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T]( + def toAppendStream[T]( table: Table, clazz: Class[T], queryConfig: StreamQueryConfig): DataStream[T] = { @@ -216,7 +321,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T]( + def toAppendStream[T]( table: Table, typeInfo: TypeInformation[T], queryConfig: StreamQueryConfig): DataStream[T] = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 8c6b27397e770..bfea96dedc086 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -138,13 +138,17 @@ class StreamTableEnvironment( * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * * @param table The [[Table]] to convert. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { - toDataStream(table, queryConfig) - } + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table) /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. @@ -157,12 +161,57 @@ class StreamTableEnvironment( * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * * @param table The [[Table]] to convert. * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation]( + table: Table, + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = { + toAppendStream(table, queryConfig) + } + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation]( table: Table, queryConfig: StreamQueryConfig): DataStream[T] = { val returnType = createTypeInformation[T] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 9874a9ea52e79..1c5a29e9ec266 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -32,7 +32,17 @@ import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa */ class TableConversions(table: Table) { - /** Converts the [[Table]] to a [[DataSet]] of the specified type. */ + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ def toDataSet[T: TypeInformation]: DataSet[T] = { table.tableEnv match { @@ -44,12 +54,71 @@ class TableConversions(table: Table) { } } - /** Converts the [[Table]] to a [[DataStream]] of the specified type. */ - def toDataStream[T: TypeInformation]: DataStream[T] = { + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = + toAppendStream(queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation]: DataStream[T] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toDataStream(table) + tEnv.toAppendStream(table) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + @@ -57,14 +126,25 @@ class TableConversions(table: Table) { } } - /** Converts the [[Table]] to a [[DataStream]] of the specified type. + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * - * @param queryConfig The configuration for the generated query. + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. */ - def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { + def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toDataStream(table, queryConfig) + tEnv.toAppendStream(table, queryConfig) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala index e8a201777fac6..9d15c1488cee5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala @@ -87,7 +87,7 @@ package object scala extends ImplicitExpressionConversions { implicit def table2RowDataStream(table: Table): DataStream[Row] = { val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv] - tableEnv.toDataStream[Row](table) + tableEnv.toAppendStream[Row](table) } implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala index 9298266bea9bb..13ec2b4b3e1d5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala @@ -46,7 +46,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.sql( "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ") - .toDataStream[Row] + .toAppendStream[Row] .addSink(new StreamITCase.StringSink) env.execute() @@ -71,7 +71,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.scan("csvTable") .where('id > 4) .select('last, 'score * 2) - .toDataStream[Row] + .toAppendStream[Row] .addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala index a7fe1c419b3a7..7ba5c160d2a43 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala @@ -67,7 +67,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() } @@ -92,7 +92,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " MIN(c) OVER (" + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -177,7 +177,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -204,7 +204,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -234,7 +234,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -259,7 +259,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -321,7 +321,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" + " FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -382,7 +382,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -450,7 +450,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " + " FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -511,7 +511,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -572,7 +572,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -638,7 +638,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -700,7 +700,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -761,7 +761,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -836,7 +836,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index ba8c1850d1485..bdc1fcc6afd71 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -57,7 +57,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = ds.toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTableRow", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -99,7 +99,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -120,7 +120,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -141,7 +141,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -165,7 +165,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -192,7 +192,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -218,7 +218,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.get3TupleDataStream(env) tEnv.registerDataStream("T2", t2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -275,7 +275,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -305,7 +305,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala index 1114cf06472a2..b355cf053cd32 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala @@ -41,7 +41,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -60,7 +60,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -79,7 +79,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { .select('_1 as 'a, '_2 as 'b, '_1 as 'c) .select('a, 'b) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -99,7 +99,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('a, 'b, 'c) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -118,7 +118,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -151,7 +151,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -171,7 +171,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter('a === 3) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -191,7 +191,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( Literal(false) ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -210,7 +210,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( Literal(true) ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -233,7 +233,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( 'a % 2 === 0 ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -257,7 +257,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( 'a % 2 !== 0) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = mutable.MutableList( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala index 846fe3e83991c..d666725caf958 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -68,7 +68,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -110,7 +110,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -136,7 +136,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select(countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -164,7 +164,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -200,7 +200,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'int2, 'int3, 'string) .select(weightAvgFun('long, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala index b097767202165..f396896aaa2af 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala @@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg) .select('c, 'mycount, 'wAvg) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -123,7 +123,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { 'b.min over 'w, weightAvgFun('b, 'a) over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { val windowedTable = table .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) .select('a, 'c.sum over 'w, 'c.min over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -241,7 +241,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -304,7 +304,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w) .select('c, 'b, 'a.count over 'w, 'a.sum over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala index f35ee76a0a64f..2b496e33e9298 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala @@ -43,7 +43,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2).select('c) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -63,7 +63,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -82,7 +82,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -101,7 +101,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala index 05e189255abf8..3ac664dfb85ad 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala @@ -69,7 +69,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w) .select('int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -104,7 +104,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -141,7 +141,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -175,7 +175,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -204,7 +204,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -232,7 +232,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = Seq( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala index 1d48f2c57fbb2..12d7202dde900 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala @@ -48,7 +48,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase { .where("RichFunc2(c)='ABC#Hello'") .select('c) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -71,7 +71,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase { .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2") .select('c) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala index b3d9c6f7e95e6..9efe6a14ceae2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala @@ -53,7 +53,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(pojoFunc0('c)) .where('age > 20) .select('c, 'name, 'age) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -70,7 +70,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB val result = t .leftOuterJoin(func0('c) as('d, 'e)) .select('c, 'd, 'e) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -90,7 +90,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(func0('c) as('d, 'e)) .where(Func18('d, "J")) .select('c, 'd, 'e) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -111,7 +111,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(tableFunc1('c) as 's) .select('a, 's) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -135,7 +135,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(tableFunc1(richFunc2('c)) as 's) .select('a, 's) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -164,7 +164,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .select('c, 'd, 'e, 'f, 'g) .join(func32('c) as ('h, 'i)) .select('c, 'd, 'f, 'h, 'e, 'g, 'i) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index 3f12218ed02d2..f2d61754e1ea2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -103,7 +103,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.select('rowtime.cast(Types.STRING)) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { .filter('rowtime.cast(Types.LONG) > 4) .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -161,7 +161,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -191,7 +191,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.unionAll(table).select('rowtime) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " + "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)") - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -262,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { .groupBy('w2) .select('w2.rowtime, 'w2.end, 'int.count) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() From 64c3947057d80c594d3284af7b40d38f1a0a5070 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 17 May 2017 11:51:30 +0200 Subject: [PATCH 2/2] Make compiler happy --- .../table/api/java/StreamTableEnvironment.scala | 17 +++++++++-------- .../api/scala/StreamTableEnvironment.scala | 4 ++-- .../table/api/scala/TableConversions.scala | 4 ++-- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index a045793b33bd7..514c124740335 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -152,9 +152,10 @@ class StreamTableEnvironment( * @param clazz The class of the type of the resulting [[DataStream]]. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") + @Deprecated def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) /** @@ -176,9 +177,9 @@ class StreamTableEnvironment( * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = toAppendStream(table, typeInfo) @@ -202,9 +203,9 @@ class StreamTableEnvironment( * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T]( table: Table, clazz: Class[T], @@ -230,9 +231,9 @@ class StreamTableEnvironment( * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T]( table: Table, typeInfo: TypeInformation[T], diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index bfea96dedc086..bfd443a576e96 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -146,7 +146,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + @deprecated("This method only supports conversion of append-only tables. In order to make this" + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table) @@ -170,7 +170,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + @deprecated("This method only supports conversion of append-only tables. In order to make this" + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation]( table: Table, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 1c5a29e9ec266..bd431eb81e1aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -72,7 +72,7 @@ class TableConversions(table: Table) { * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + @deprecated("This method only supports conversion of append-only tables. In order to make this" + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream @@ -95,7 +95,7 @@ class TableConversions(table: Table) { * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - @Deprecated("This method only supports conversion of append-only tables. In order to make this" + + @deprecated("This method only supports conversion of append-only tables. In order to make this" + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(queryConfig)