From 4a70d1f4c6f9b0a27d7fc28c39cbd72256272074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Wed, 16 Oct 2024 11:00:36 +0530 Subject: [PATCH 01/12] [FLINK-36488] [TABLE SQL/API] Remove deprecated methods StreamTableEnvironment.toAppendStream from flink-table-api-java-bridge module --- .../bridge/java/StreamTableEnvironment.java | 55 ------------------- .../internal/StreamTableEnvironmentImpl.java | 17 ------ 2 files changed, 72 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 86610a431fb1a..21b21f1edd782 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -864,61 +864,6 @@ DataStream toChangelogStream( * be used as source of truth. */ @Deprecated - DataStream toAppendStream(Table table, Class clazz); - - /** - * Converts the given {@link Table} into an append {@link DataStream} of a specified type. - * - *

The {@link Table} must only have insert (append) changes. If the {@link Table} is also - * modified by update or delete changes, the conversion will fail. - * - *

The fields of the {@link Table} are mapped to {@link DataStream} fields as follows: - * - *

    - *
  • {@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped - * by position, field types must match. - *
  • POJO {@link DataStream} types: Fields are mapped by field name, field types must match. - *
- * - * @param table The {@link Table} to convert. - * @param typeInfo The {@link TypeInformation} that specifies the type of the {@link - * DataStream}. - * @param The type of the resulting {@link DataStream}. - * @return The converted {@link DataStream}. - * @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type - * system and supports all kinds of {@link DataTypes} that the table runtime can produce. - * The semantics might be slightly different for raw and structured types. Use {@code - * toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should - * be used as source of truth. - */ - @Deprecated - DataStream toAppendStream(Table table, TypeInformation typeInfo); - - /** - * Converts the given {@link Table} into a {@link DataStream} of add and retract messages. The - * message will be encoded as {@link Tuple2}. The first field is a {@link Boolean} flag, the - * second field holds the record of the specified type {@link T}. - * - *

A true {@link Boolean} flag indicates an add message, a false flag indicates a retract - * message. - * - *

The fields of the {@link Table} are mapped to {@link DataStream} fields as follows: - * - *

    - *
  • {@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped - * by position, field types must match. - *
  • POJO {@link DataStream} types: Fields are mapped by field name, field types must match. - *
- * - * @param table The {@link Table} to convert. - * @param clazz The class of the requested record type. - * @param The type of the requested record type. - * @return The converted {@link DataStream}. - * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It integrates with the new - * type system and supports all kinds of {@link DataTypes} and every {@link ChangelogMode} - * that the table runtime can produce. - */ - @Deprecated DataStream> toRetractStream(Table table, Class clazz); /** diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 89aa1238d22d3..490d91138ee04 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -58,7 +58,6 @@ import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.MutableURLClassLoader; @@ -358,22 +357,6 @@ public void createTemporaryView( createTemporaryView(path, fromDataStream(dataStream, fields)); } - @Override - public DataStream toAppendStream(Table table, Class clazz) { - TypeInformation typeInfo = extractTypeInformation(table, clazz); - return toAppendStream(table, typeInfo); - } - - @Override - public DataStream toAppendStream(Table table, TypeInformation typeInfo) { - OutputConversionModifyOperation modifyOperation = - new OutputConversionModifyOperation( - table.getQueryOperation(), - TypeConversions.fromLegacyInfoToDataType(typeInfo), - OutputConversionModifyOperation.UpdateMode.APPEND); - return toStreamInternal(table, modifyOperation); - } - @Override public DataStream> toRetractStream(Table table, Class clazz) { TypeInformation typeInfo = extractTypeInformation(table, clazz); From 367617543acd57a597943f0669229b7c44125ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Sun, 12 Jan 2025 21:31:12 +0530 Subject: [PATCH 02/12] removing python test case having dependency on the removed deprecated method --- .../test_stream_execution_environment.py | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 03d43e5fb3fe1..63c34234ab402 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -260,57 +260,6 @@ def test_execute_async(self): execution_result = job_client.get_job_execution_result().result() self.assertEqual(str(job_id), str(execution_result.get_job_id())) - def test_add_python_file(self): - import uuid - env = self.env - python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) - os.mkdir(python_file_dir) - python_file_path = os.path.join(python_file_dir, "test_dep1.py") - with open(python_file_path, 'w') as f: - f.write("def add_two(a):\n return a + 2") - - def plus_two_map(value): - from test_dep1 import add_two - return add_two(value) - - get_j_env_configuration(env._j_stream_execution_environment).\ - setString("taskmanager.numberOfTaskSlots", "10") - env.add_python_file(python_file_path) - ds = env.from_collection([1, 2, 3, 4, 5]) - ds = ds.map(plus_two_map, Types.LONG()) \ - .slot_sharing_group("data_stream") \ - .map(lambda i: i, Types.LONG()) \ - .slot_sharing_group("table") - - python_file_path = os.path.join(python_file_dir, "test_dep2.py") - with open(python_file_path, 'w') as f: - f.write("def add_three(a):\n return a + 3") - - def plus_three(value): - from test_dep2 import add_three - return add_three(value) - - t_env = StreamTableEnvironment.create( - stream_execution_environment=env, - environment_settings=EnvironmentSettings.in_streaming_mode()) - env.add_python_file(python_file_path) - - from pyflink.table.udf import udf - from pyflink.table.expressions import col - add_three = udf(plus_three, result_type=DataTypes.BIGINT()) - - tab = t_env.from_data_stream(ds, col('a')) \ - .select(add_three(col('a'))) - t_env.to_append_stream(tab, Types.ROW([Types.LONG()])) \ - .map(lambda i: i[0]) \ - .add_sink(self.test_sink) - env.execute("test add_python_file") - result = self.test_sink.get_results(True) - expected = ['6', '7', '8', '9', '10'] - result.sort() - expected.sort() - self.assertEqual(expected, result) - def test_add_python_file_2(self): import uuid env = self.env From 8c869c83e2b9d4c7395743a16258f84d7a24a1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Mon, 13 Jan 2025 20:40:27 +0530 Subject: [PATCH 03/12] removed toAppendStream from Scala StreamTableEnvironment and python test dependent on the method --- .../pyflink.table/table_environment.rst | 1 - .../table/tests/test_table_environment_api.py | 17 ---------- .../bridge/scala/StreamTableEnvironment.scala | 26 -------------- .../api/bridge/scala/TableConversions.scala | 34 ------------------- .../internal/StreamTableEnvironmentImpl.scala | 10 ------ 5 files changed, 88 deletions(-) diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst b/flink-python/docs/reference/pyflink.table/table_environment.rst index 46a7060be2568..3ab8a3bf65bda 100644 --- a/flink-python/docs/reference/pyflink.table/table_environment.rst +++ b/flink-python/docs/reference/pyflink.table/table_environment.rst @@ -251,7 +251,6 @@ StreamTableEnvironment StreamTableEnvironment.sql_query StreamTableEnvironment.to_data_stream StreamTableEnvironment.to_changelog_stream - StreamTableEnvironment.to_append_stream StreamTableEnvironment.to_retract_stream StreamTableEnvironment.unload_module StreamTableEnvironment.use_catalog diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 5fe20d941a72a..74c82128d1ea9 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -599,23 +599,6 @@ def test_from_and_to_changelog_stream_event_time(self): actual_results.sort() self.assertEqual(expected_results, actual_results) - def test_to_append_stream(self): - self.env.set_parallelism(1) - t_env = StreamTableEnvironment.create( - self.env, - environment_settings=EnvironmentSettings.in_streaming_mode()) - table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"]) - new_table = table.select(table.a + 1, table.b + 'flink', table.c) - ds = t_env.to_append_stream(table=new_table, type_info=Types.ROW([Types.LONG(), - Types.STRING(), - Types.STRING()])) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) - self.env.execute("test_to_append_stream") - result = test_sink.get_results(False) - expected = ['+I[2, Hiflink, Hello]', '+I[3, Helloflink, Hi]'] - self.assertEqual(result, expected) - def test_to_retract_stream(self): self.env.set_parallelism(1) t_env = StreamTableEnvironment.create( diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index e5d33d6024441..b25a17a0faf8c 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -721,32 +721,6 @@ trait StreamTableEnvironment extends TableEnvironment { @deprecated def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified by - * update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[Row]] and Scala Tuple types: Fields are mapped by position, field types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * @param table - * The [[Table]] to convert. - * @tparam T - * The type of the resulting [[DataStream]]. - * @return - * The converted [[DataStream]]. - * @deprecated - * Use [[toDataStream(Table, Class)]] instead. It integrates with the new type system and - * supports all kinds of [[DataTypes]] that the table runtime can produce. The semantics might - * be slightly different for raw and structured types. Use - * `toDataStream(DataTypes.of(Types.of[Class]))` if [[TypeInformation]] should be used as source - * of truth. - */ - @deprecated - def toAppendStream[T: TypeInformation](table: Table): DataStream[T] - /** * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. The message * will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second field holds diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala index bf33ecfddd37f..7ffb79685bc20 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala @@ -189,40 +189,6 @@ class TableConversions(table: Table) { // Legacy before FLIP-136 // ---------------------------------------------------------------------------------------------- - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified by - * update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field - * types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * @tparam T - * The type of the resulting [[DataStream]]. - * @return - * The converted [[DataStream]]. - * @deprecated - * Use [[toDataStream(Table, Class)]] instead. It integrates with the new type system and - * supports all kinds of [[DataTypes]] that the table runtime can produce. The semantics might - * be slightly different for raw and structured types. Use - * `toDataStream(DataTypes.of(Types.of[Class]))` if [[TypeInformation]] should be used as source - * of truth. - */ - @deprecated - def toAppendStream[T: TypeInformation]: DataStream[T] = { - internalEnv match { - case tEnv: StreamTableEnvironment => - tEnv.toAppendStream(table) - case _ => - throw new ValidationException( - "Only tables that originate from Scala DataStreams " + - "can be converted to Scala DataStreams.") - } - } - /** * Converts the [[Table]] to a [[DataStream]] of add and retract messages. The message will be * encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second field holds the record diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 60b29a9aeafbc..0c3beb544875b 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -193,16 +193,6 @@ class StreamTableEnvironmentImpl( createTable(queryOperation) } - override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = { - val returnType = createTypeInformation[T] - - val modifyOperation = new OutputConversionModifyOperation( - table.getQueryOperation, - TypeConversions.fromLegacyInfoToDataType(returnType), - OutputConversionModifyOperation.UpdateMode.APPEND) - toStreamInternal[T](table, modifyOperation) - } - override def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { val returnType = createTypeInformation[(Boolean, T)] From 7cfb0ba14702e5bc5de7947aaef4e83eb412b71a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Tue, 14 Jan 2025 00:34:37 +0530 Subject: [PATCH 04/12] removed toAppendStream method reference from WindowAggregateITCase --- .../stream/sql/WindowAggregateITCase.scala | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 0986d390a22df..36d13c00b52a2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -1290,47 +1290,6 @@ class WindowAggregateITCase( .isEqualTo(expected.sorted.mkString("\n")) } - @TestTemplate - def testDistinctAggWithMergeOnEventTimeSessionWindow(): Unit = { - // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge - val sessionWindowTestData = List( - (1L, 2, "Hello"), // (1, Hello) - window - (2L, 2, "Hello"), // (1, Hello) - window, deduped - (8L, 2, "Hello"), // (2, Hello) - window, deduped during merge - (10L, 3, "Hello"), // (2, Hello) - window, forwarded during merge - (9L, 9, "Hello World"), // (1, Hello World) - window - (4L, 1, "Hello"), // (1, Hello) - window, triggering merge - (16L, 16, "Hello") - ) // (3, Hello) - window (not merged) - - val stream = failingDataSource(sessionWindowTestData) - .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) - val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) - tEnv.registerTable("MyTable", table) - - val sqlQuery = - """ - |SELECT c, - | COUNT(DISTINCT b), - | window_end - |FROM TABLE( - | SESSION(TABLE MyTable PARTITION BY c, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND)) - |GROUP BY c, window_start, window_end - """.stripMargin - val sink = new TestingAppendSink - tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) - env.execute() - - val expected = Seq( - "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till {14L} - "Hello,1,1970-01-01T00:00:00.021", // window starts at [16L] till {21L}, not merged - "Hello,3,1970-01-01T00:00:00.015" // window starts at [1L,2L], - // merged with [8L,10L], by [4L], till {15L} - ) - assertThat(sink.getAppendResults.sorted.mkString("\n")) - .isEqualTo(expected.sorted.mkString("\n")) - } - @TestTemplate def testPercentileOnEventTimeTumbleWindow(): Unit = { val sql = From 721188e4eb029ecfc61cc81b270977ade1e2f40e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Tue, 14 Jan 2025 22:46:38 +0530 Subject: [PATCH 05/12] cleanup for toAppendStream and to_append_stream from docs and fixed review comments. --- .../datastream/intro_to_datastream_api.md | 36 ----------- .../python/datastream/operators/overview.md | 61 ------------------- .../docs/dev/table/data_stream_api.md | 14 ----- .../datastream/intro_to_datastream_api.md | 36 ----------- .../python/datastream/operators/overview.md | 61 ------------------- .../content/docs/dev/table/data_stream_api.md | 20 ------ .../pyflink/table/table_environment.py | 18 ------ .../bridge/java/StreamTableEnvironment.java | 21 +++---- 8 files changed, 10 insertions(+), 257 deletions(-) diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md index d87e1f4ee2e9c..ab1d421a38126 100644 --- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md @@ -204,40 +204,6 @@ ds = env.from_source( Note The `DataStream` created using `from_source` could be executed in both `batch` and `streaming` executing mode. -### Create using Table & SQL connectors - -Table & SQL connectors could also be used to create a `DataStream`. You could firstly create a -`Table` using Table & SQL connectors and then convert it to a `DataStream`. - -```python -from pyflink.common.typeinfo import Types -from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import StreamTableEnvironment - -env = StreamExecutionEnvironment.get_execution_environment() -t_env = StreamTableEnvironment.create(stream_execution_environment=env) - -t_env.execute_sql(""" - CREATE TABLE my_source ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'datagen', - 'number-of-rows' = '10' - ) - """) - -ds = t_env.to_append_stream( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) -``` - -Note The StreamExecutionEnvironment `env` should be specified -when creating the TableEnvironment `t_env`. - -Note As all the Java Table & SQL connectors could be used in -PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API. - {{< top >}} DataStream Transformations @@ -266,8 +232,6 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()])) -# or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index f482e7308fac0..77b7ee6e706a3 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -86,67 +86,6 @@ For more details about the pickle serializer, please refer to [Pickle Serializat Generally, the output type needs to be specified in the following scenarios. -### Convert DataStream into Table - -```python -from pyflink.common.typeinfo import Types -from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import StreamTableEnvironment - - -def data_stream_api_demo(): - env = StreamExecutionEnvironment.get_execution_environment() - t_env = StreamTableEnvironment.create(stream_execution_environment=env) - - t_env.execute_sql(""" - CREATE TABLE my_source ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'datagen', - 'number-of-rows' = '10' - ) - """) - - ds = t_env.to_append_stream( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) - - def split(s): - splits = s[1].split("|") - for sp in splits: - yield s[0], sp - - ds = ds.map(lambda i: (i[0] + 1, i[1])) \ - .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ - .key_by(lambda i: i[1]) \ - .reduce(lambda i, j: (i[0] + j[0], i[1])) - - t_env.execute_sql(""" - CREATE TABLE my_sink ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'print' - ) - """) - - table = t_env.from_data_stream(ds) - table_result = table.execute_insert("my_sink") - - # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出 - # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法 - table_result.wait() - - -if __name__ == '__main__': - data_stream_api_demo() -``` - -The output type must be specified for the flat_map operation in the above example which will be used as -the output type of the reduce operation implicitly. The reason is that -`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. - ### Write DataStream to Sink ```python diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index 7cbb8551e4dd8..a1454363c2a75 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -3150,13 +3150,6 @@ Table table = tableEnv.fromValues( row("john", 35), row("sarah", 32)); -// Convert the Table into an append DataStream of Row by specifying the class -DataStream dsRow = tableEnv.toAppendStream(table, Row.class); - -// Convert the Table into an append DataStream of Tuple2 with TypeInformation -TupleTypeInfo> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT()); -DataStream> dsTuple = tableEnv.toAppendStream(table, tupleType); - // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. @@ -3177,13 +3170,6 @@ val table: Table = tableEnv.fromValues( row("john", 35), row("sarah", 32)) -// Convert the Table into an append DataStream of Row by specifying the class -val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) - -// Convert the Table into an append DataStream of (String, Integer) with TypeInformation -val dsTuple: DataStream[(String, Int)] dsTuple = - tableEnv.toAppendStream[(String, Int)](table) - // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md index 61b0d56b45198..5c867dba82f01 100644 --- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md @@ -204,40 +204,6 @@ ds = env.from_source( Note The `DataStream` created using `from_source` could be executed in both `batch` and `streaming` executing mode. -### Create using Table & SQL connectors - -Table & SQL connectors could also be used to create a `DataStream`. You could firstly create a -`Table` using Table & SQL connectors and then convert it to a `DataStream`. - -```python -from pyflink.common.typeinfo import Types -from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import StreamTableEnvironment - -env = StreamExecutionEnvironment.get_execution_environment() -t_env = StreamTableEnvironment.create(stream_execution_environment=env) - -t_env.execute_sql(""" - CREATE TABLE my_source ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'datagen', - 'number-of-rows' = '10' - ) - """) - -ds = t_env.to_append_stream( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) -``` - -Note The StreamExecutionEnvironment `env` should be specified -when creating the TableEnvironment `t_env`. - -Note As all the Java Table & SQL connectors could be used in -PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API. - {{< top >}} DataStream Transformations @@ -266,8 +232,6 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()])) -# or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index 6fa39c376387a..6846e94134488 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -86,67 +86,6 @@ For more details about the pickle serializer, please refer to [Pickle Serializat Generally, the output type needs to be specified in the following scenarios. -### Convert DataStream into Table - -```python -from pyflink.common.typeinfo import Types -from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import StreamTableEnvironment - - -def data_stream_api_demo(): - env = StreamExecutionEnvironment.get_execution_environment() - t_env = StreamTableEnvironment.create(stream_execution_environment=env) - - t_env.execute_sql(""" - CREATE TABLE my_source ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'datagen', - 'number-of-rows' = '10' - ) - """) - - ds = t_env.to_append_stream( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) - - def split(s): - splits = s[1].split("|") - for sp in splits: - yield s[0], sp - - ds = ds.map(lambda i: (i[0] + 1, i[1])) \ - .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ - .key_by(lambda i: i[1]) \ - .reduce(lambda i, j: (i[0] + j[0], i[1])) - - t_env.execute_sql(""" - CREATE TABLE my_sink ( - a INT, - b VARCHAR - ) WITH ( - 'connector' = 'print' - ) - """) - - table = t_env.from_data_stream(ds) - table_result = table.execute_insert("my_sink") - - # 1)wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running - # 2)should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode - table_result.wait() - - -if __name__ == '__main__': - data_stream_api_demo() -``` - -The output type must be specified for the flat_map operation in the above example which will be used as -the output type of the reduce operation implicitly. The reason is that -`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. - ### Write DataStream to Sink ```python diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 6932ffcdd473d..c0d941d8ca6ae 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3141,13 +3141,6 @@ Table table = tableEnv.fromValues( row("john", 35), row("sarah", 32)); -// Convert the Table into an append DataStream of Row by specifying the class -DataStream dsRow = tableEnv.toAppendStream(table, Row.class); - -// Convert the Table into an append DataStream of Tuple2 with TypeInformation -TupleTypeInfo> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT()); -DataStream> dsTuple = tableEnv.toAppendStream(table, tupleType); - // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. @@ -3168,13 +3161,6 @@ val table: Table = tableEnv.fromValues( row("john", 35), row("sarah", 32)) -// Convert the Table into an append DataStream of Row by specifying the class -val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) - -// Convert the Table into an append DataStream of (String, Integer) with TypeInformation -val dsTuple: DataStream[(String, Int)] dsTuple = - tableEnv.toAppendStream[(String, Int)](table) - // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. @@ -3193,12 +3179,6 @@ table = t_env.from_elements([("john", 35), ("sarah", 32)], DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT())])) -# Convert the Table into an append DataStream of Row by specifying the type information -ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), Types.INT()])) - -# Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation -ds_tuple = t_env.to_append_stream(table, Types.TUPLE([Types.STRING(), Types.INT()])) - # Convert the Table into a retract DataStream of Row by specifying the type information # A retract stream of type X is a DataStream of Tuple[bool, X]. # The boolean field indicates the type of the change. diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 7f6784f59b5cc..4e1d3e68ceb3c 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1878,24 +1878,6 @@ def to_changelog_stream(self, target_schema._j_schema, changelog_mode._j_changelog_mode)) - def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream: - """ - Converts the given Table into a DataStream of a specified type. The Table must only have - insert (append) changes. If the Table is also modified by update or delete changes, the - conversion will fail. - - The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are - mapped by position, field types must match. - - :param table: The Table to convert. - :param type_info: The TypeInformation that specifies the type of the DataStream. - :return: The converted DataStream. - - .. versionadded:: 1.12.0 - """ - j_data_stream = self._j_tenv.toAppendStream(table._j_table, type_info.get_java_type_info()) - return DataStream(j_data_stream=j_data_stream) - def to_retract_stream(self, table: Table, type_info: TypeInformation) -> DataStream: """ Converts the given Table into a DataStream of add and retract messages. The message will be diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 07bc413839c20..bf7f1ba466a4b 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -764,10 +764,12 @@ DataStream toChangelogStream( void createTemporaryView(String path, DataStream dataStream, Expression... fields); /** - * Converts the given {@link Table} into an append {@link DataStream} of a specified type. + * Converts the given {@link Table} into a {@link DataStream} of add and retract messages. The + * message will be encoded as {@link Tuple2}. The first field is a {@link Boolean} flag, the + * second field holds the record of the specified type {@link T}. * - *

The {@link Table} must only have insert (append) changes. If the {@link Table} is also - * modified by update or delete changes, the conversion will fail. + *

A true {@link Boolean} flag indicates an add message, a false flag indicates a retract + * message. * *

The fields of the {@link Table} are mapped to {@link DataStream} fields as follows: * @@ -778,16 +780,13 @@ DataStream toChangelogStream( * * * @param table The {@link Table} to convert. - * @param clazz The class of the type of the resulting {@link DataStream}. - * @param The type of the resulting {@link DataStream}. + * @param clazz The class of the requested record type. + * @param The type of the requested record type. * @return The converted {@link DataStream}. - * @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type - * system and supports all kinds of {@link DataTypes} that the table runtime can produce. - * The semantics might be slightly different for raw and structured types. Use {@code - * toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should - * be used as source of truth. + * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It integrates with the new + * type system and supports all kinds of {@link DataTypes} and every {@link ChangelogMode} + * that the table runtime can produce. */ - @Deprecated DataStream> toRetractStream(Table table, Class clazz); /** From 03443a122796a0f9dc612bafc5c0ba78ceb93543 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Tue, 14 Jan 2025 22:54:33 +0530 Subject: [PATCH 06/12] fixed review comments --- .../flink/table/api/bridge/java/StreamTableEnvironment.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index bf7f1ba466a4b..199b1ce3f1614 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -787,6 +787,7 @@ DataStream toChangelogStream( * type system and supports all kinds of {@link DataTypes} and every {@link ChangelogMode} * that the table runtime can produce. */ + @Deprecated DataStream> toRetractStream(Table table, Class clazz); /** From 24bc5fd31d9fc32640f41aaffeda467a6044a899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Wed, 15 Jan 2025 12:18:06 +0530 Subject: [PATCH 07/12] Fixed review comments and cleanup for deprecated method --- .../datastream/operators/process_function.md | 2 +- .../datastream/intro_to_datastream_api.md | 36 +++++++++++ .../python/datastream/operators/overview.md | 61 +++++++++++++++++++ .../docs/dev/table/data_stream_api.md | 14 +++++ .../datastream/operators/process_function.md | 2 +- .../datastream/intro_to_datastream_api.md | 36 +++++++++++ .../python/datastream/operators/overview.md | 61 +++++++++++++++++++ .../content/docs/dev/table/data_stream_api.md | 14 +++++ .../test_stream_execution_environment.py | 51 ++++++++++++++++ .../pyflink/table/table_environment.py | 23 +++++++ .../FlinkChangelogModeInferenceProgram.scala | 2 +- .../stream/sql/WindowAggregateITCase.scala | 41 +++++++++++++ 12 files changed, 340 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index dfa49a624cde0..d5e6957439df4 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -313,7 +313,7 @@ if __name__ == '__main__': ) """) - stream = t_env.to_append_stream( + stream = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md index ab1d421a38126..514b22d2d97d7 100644 --- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md @@ -204,6 +204,40 @@ ds = env.from_source( Note The `DataStream` created using `from_source` could be executed in both `batch` and `streaming` executing mode. +### Create using Table & SQL connectors + +Table & SQL connectors could also be used to create a `DataStream`. You could firstly create a +`Table` using Table & SQL connectors and then convert it to a `DataStream`. + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(stream_execution_environment=env) + +t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + +ds = t_env.toDataStream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) +``` + +Note The StreamExecutionEnvironment `env` should be specified +when creating the TableEnvironment `t_env`. + +Note As all the Java Table & SQL connectors could be used in +PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API. + {{< top >}} DataStream Transformations @@ -232,6 +266,8 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream +ds = t_env.toDataStream(table, Types.ROW([Types.INT(), Types.STRING()])) +# or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index 77b7ee6e706a3..1641f825ee5cc 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -86,6 +86,67 @@ For more details about the pickle serializer, please refer to [Pickle Serializat Generally, the output type needs to be specified in the following scenarios. +### Convert DataStream into Table + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + + +def data_stream_api_demo(): + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + + ds = t_env.toDataStream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) + + def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + + ds = ds.map(lambda i: (i[0] + 1, i[1])) \ + .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ + .key_by(lambda i: i[1]) \ + .reduce(lambda i, j: (i[0] + j[0], i[1])) + + t_env.execute_sql(""" + CREATE TABLE my_sink ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'print' + ) + """) + + table = t_env.from_data_stream(ds) + table_result = table.execute_insert("my_sink") + + # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出 + # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法 + table_result.wait() + + +if __name__ == '__main__': + data_stream_api_demo() +``` + +The output type must be specified for the flat_map operation in the above example which will be used as +the output type of the reduce operation implicitly. The reason is that +`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. + ### Write DataStream to Sink ```python diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index a1454363c2a75..7c5157b78dac7 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -3150,6 +3150,13 @@ Table table = tableEnv.fromValues( row("john", 35), row("sarah", 32)); +// Convert the Table into an append DataStream of Row by specifying the class +DataStream dsRow = tableEnv.toDataStream(table, Row.class); + +// Convert the Table into an append DataStream of Tuple2 with TypeInformation +TupleTypeInfo> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT()); +DataStream> dsTuple = tableEnv.toDataStream(table, tupleType); + // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. @@ -3170,6 +3177,13 @@ val table: Table = tableEnv.fromValues( row("john", 35), row("sarah", 32)) +// Convert the Table into an append DataStream of Row by specifying the class +val dsRow: DataStream[Row] = tableEnv.toDataStream[Row](table) + +// Convert the Table into an append DataStream of (String, Integer) with TypeInformation +val dsTuple: DataStream[(String, Int)] dsTuple = + tableEnv.toDataStream[(String, Int)](table) + // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md index 990f6a5dfd7c0..7a68276bd69d4 100644 --- a/docs/content/docs/dev/datastream/operators/process_function.md +++ b/docs/content/docs/dev/datastream/operators/process_function.md @@ -320,7 +320,7 @@ if __name__ == '__main__': ) """) - stream = t_env.to_append_stream( + stream = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md index 5c867dba82f01..7d7a75cce0e11 100644 --- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md @@ -204,6 +204,40 @@ ds = env.from_source( Note The `DataStream` created using `from_source` could be executed in both `batch` and `streaming` executing mode. +### Create using Table & SQL connectors + +Table & SQL connectors could also be used to create a `DataStream`. You could firstly create a +`Table` using Table & SQL connectors and then convert it to a `DataStream`. + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(stream_execution_environment=env) + +t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + +ds = t_env.toDataStream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) +``` + +Note The StreamExecutionEnvironment `env` should be specified +when creating the TableEnvironment `t_env`. + +Note As all the Java Table & SQL connectors could be used in +PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API. + {{< top >}} DataStream Transformations @@ -232,6 +266,8 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream +ds = t_env.toDataStream(table, Types.ROW([Types.INT(), Types.STRING()])) +# or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index 6846e94134488..e5e1ef6df1e71 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -86,6 +86,67 @@ For more details about the pickle serializer, please refer to [Pickle Serializat Generally, the output type needs to be specified in the following scenarios. +### Convert DataStream into Table + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + + +def data_stream_api_demo(): + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + + ds = t_env.toDataStream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) + + def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + + ds = ds.map(lambda i: (i[0] + 1, i[1])) \ + .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ + .key_by(lambda i: i[1]) \ + .reduce(lambda i, j: (i[0] + j[0], i[1])) + + t_env.execute_sql(""" + CREATE TABLE my_sink ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'print' + ) + """) + + table = t_env.from_data_stream(ds) + table_result = table.execute_insert("my_sink") + + # 1)wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running + # 2)should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode + table_result.wait() + + +if __name__ == '__main__': + data_stream_api_demo() +``` + +The output type must be specified for the flat_map operation in the above example which will be used as +the output type of the reduce operation implicitly. The reason is that +`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. + ### Write DataStream to Sink ```python diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index c0d941d8ca6ae..53622750ef2ad 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3141,6 +3141,13 @@ Table table = tableEnv.fromValues( row("john", 35), row("sarah", 32)); +// Convert the Table into an append DataStream of Row by specifying the class +DataStream dsRow = tableEnv.toDataStream(table, Row.class); + +// Convert the Table into an append DataStream of Tuple2 with TypeInformation +TupleTypeInfo> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT()); +DataStream> dsTuple = tableEnv.toDataStream(table, tupleType); + // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. @@ -3161,6 +3168,13 @@ val table: Table = tableEnv.fromValues( row("john", 35), row("sarah", 32)) +// Convert the Table into an append DataStream of Row by specifying the class +val dsRow: DataStream[Row] = tableEnv.toDataStream[Row](table) + +// Convert the Table into an append DataStream of (String, Integer) with TypeInformation +val dsTuple: DataStream[(String, Int)] dsTuple = + tableEnv.toDataStream[(String, Int)](table) + // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. // The boolean field indicates the type of the change. diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 63c34234ab402..df7654f488806 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -260,6 +260,57 @@ def test_execute_async(self): execution_result = job_client.get_job_execution_result().result() self.assertEqual(str(job_id), str(execution_result.get_job_id())) + def test_add_python_file(self): + import uuid + env = self.env + python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) + os.mkdir(python_file_dir) + python_file_path = os.path.join(python_file_dir, "test_dep1.py") + with open(python_file_path, 'w') as f: + f.write("def add_two(a):\n return a + 2") + + def plus_two_map(value): + from test_dep1 import add_two + return add_two(value) + + get_j_env_configuration(env._j_stream_execution_environment).\ + setString("taskmanager.numberOfTaskSlots", "10") + env.add_python_file(python_file_path) + ds = env.from_collection([1, 2, 3, 4, 5]) + ds = ds.map(plus_two_map, Types.LONG()) \ + .slot_sharing_group("data_stream") \ + .map(lambda i: i, Types.LONG()) \ + .slot_sharing_group("table") + + python_file_path = os.path.join(python_file_dir, "test_dep2.py") + with open(python_file_path, 'w') as f: + f.write("def add_three(a):\n return a + 3") + + def plus_three(value): + from test_dep2 import add_three + return add_three(value) + + t_env = StreamTableEnvironment.create( + stream_execution_environment=env, + environment_settings=EnvironmentSettings.in_streaming_mode()) + env.add_python_file(python_file_path) + + from pyflink.table.udf import udf + from pyflink.table.expressions import col + add_three = udf(plus_three, result_type=DataTypes.BIGINT()) + + tab = t_env.from_data_stream(ds, col('a')) \ + .select(add_three(col('a'))) + t_env.to_data_stream(tab, Types.ROW([Types.LONG()])) \ + .map(lambda i: i[0]) \ + .add_sink(self.test_sink) + env.execute("test add_python_file") + result = self.test_sink.get_results(True) + expected = ['6', '7', '8', '9', '10'] + result.sort() + expected.sort() + self.assertEqual(expected, result) + def test_add_python_file_2(self): import uuid env = self.env diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 4e1d3e68ceb3c..f9136eced4034 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1784,6 +1784,29 @@ def to_data_stream(self, table: Table) -> DataStream: """ return DataStream(self._j_tenv.toDataStream(table._j_table)) + def to_data_stream(self, table: Table, type_info: TypeInformation) -> DataStream: + """ + Converts the given Table into a DataStream. + + Since the DataStream API does not support changelog processing natively, this method + assumes append-only/insert-only semantics during the table-to-stream conversion. The records + of class Row will always describe RowKind#INSERT changes. Updating tables are + not supported by this method and will produce an exception. + + Note that the type system of the table ecosystem is richer than the one of the DataStream + API. The table runtime will make sure to properly serialize the output records to the first + operator of the DataStream API. Afterwards, the Types semantics of the DataStream API + need to be considered. + + If the input table contains a single rowtime column, it will be propagated into a stream + record's timestamp. Watermarks will be propagated as well. + + :param table: The Table to convert. + :param type_info: The TypeInformation that specifies the type of the DataStream. + :return: The converted DataStream. + """ + return DataStream(self._j_tenv.toDataStream(table._j_table, type_info.get_java_type_info())) + def to_changelog_stream(self, table: Table, target_schema: Schema = None, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 681dee6e7660c..14ba9513118a4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -143,7 +143,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti if (ds.withChangeFlag) { (ModifyKindSetTrait.ALL_CHANGES, "toRetractStream") } else { - (ModifyKindSetTrait.INSERT_ONLY, "toAppendStream") + (ModifyKindSetTrait.INSERT_ONLY, "toDataStream") } case _ => throw new UnsupportedOperationException( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 36d13c00b52a2..9dd47970ce66f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -1290,6 +1290,47 @@ class WindowAggregateITCase( .isEqualTo(expected.sorted.mkString("\n")) } + @TestTemplate + def testDistinctAggWithMergeOnEventTimeSessionWindow(): Unit = { + // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge + val sessionWindowTestData = List( + (1L, 2, "Hello"), // (1, Hello) - window + (2L, 2, "Hello"), // (1, Hello) - window, deduped + (8L, 2, "Hello"), // (2, Hello) - window, deduped during merge + (10L, 3, "Hello"), // (2, Hello) - window, forwarded during merge + (9L, 9, "Hello World"), // (1, Hello World) - window + (4L, 1, "Hello"), // (1, Hello) - window, triggering merge + (16L, 16, "Hello") + ) // (3, Hello) - window (not merged) + + val stream = failingDataSource(sessionWindowTestData) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + tEnv.registerTable("MyTable", table) + + val sqlQuery = + """ + |SELECT c, + | COUNT(DISTINCT b), + | window_end + |FROM TABLE( + | SESSION(TABLE MyTable PARTITION BY c, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND)) + |GROUP BY c, window_start, window_end + """.stripMargin + val sink = new TestingAppendSink + tEnv.sqlQuery(sqlQuery).toDataStream.addSink(sink) + env.execute() + + val expected = Seq( + "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till {14L} + "Hello,1,1970-01-01T00:00:00.021", // window starts at [16L] till {21L}, not merged + "Hello,3,1970-01-01T00:00:00.015" // window starts at [1L,2L], + // merged with [8L,10L], by [4L], till {15L} + ) + assertThat(sink.getAppendResults.sorted.mkString("\n")) + .isEqualTo(expected.sorted.mkString("\n")) + } + @TestTemplate def testPercentileOnEventTimeTumbleWindow(): Unit = { val sql = From 6a56e179814b159f2eff8b5feacb9118d613dfde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Wed, 15 Jan 2025 13:35:19 +0530 Subject: [PATCH 08/12] cleanup for deprecated method --- .../datastream/intro_to_datastream_api.md | 4 +- .../python/datastream/operators/overview.md | 2 +- .../datastream/intro_to_datastream_api.md | 4 +- .../python/datastream/operators/overview.md | 2 +- .../content/docs/dev/table/data_stream_api.md | 6 ++ .../test_stream_execution_environment.py | 98 +++++++++---------- .../table/tests/test_table_environment_api.py | 17 ++++ 7 files changed, 78 insertions(+), 55 deletions(-) diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md index 514b22d2d97d7..6cd24202f74c1 100644 --- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,7 +227,7 @@ t_env.execute_sql(""" ) """) -ds = t_env.toDataStream( +ds = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) ``` @@ -266,7 +266,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.toDataStream(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream(table, Types.ROW([Types.INT(), Types.STRING()])) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index 1641f825ee5cc..139029ec01306 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -108,7 +108,7 @@ def data_stream_api_demo(): ) """) - ds = t_env.toDataStream( + ds = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md index 7d7a75cce0e11..190523235da6a 100644 --- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,7 +227,7 @@ t_env.execute_sql(""" ) """) -ds = t_env.toDataStream( +ds = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) ``` @@ -266,7 +266,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.toDataStream(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream(table, Types.ROW([Types.INT(), Types.STRING()])) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index e5e1ef6df1e71..6d03bac47d968 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -108,7 +108,7 @@ def data_stream_api_demo(): ) """) - ds = t_env.toDataStream( + ds = t_env.to_data_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 53622750ef2ad..9459933741a6c 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3193,6 +3193,12 @@ table = t_env.from_elements([("john", 35), ("sarah", 32)], DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT())])) +# Convert the Table into an append DataStream of Row by specifying the type information +ds_row = t_env.to_data_stream(table, Types.ROW([Types.STRING(), Types.INT()])) + +# Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation +ds_tuple = t_env.to_data_stream(table, Types.TUPLE([Types.STRING(), Types.INT()])) + # Convert the Table into a retract DataStream of Row by specifying the type information # A retract stream of type X is a DataStream of Tuple[bool, X]. # The boolean field indicates the type of the change. diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index df7654f488806..81bc16e37933a 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -261,55 +261,55 @@ def test_execute_async(self): self.assertEqual(str(job_id), str(execution_result.get_job_id())) def test_add_python_file(self): - import uuid - env = self.env - python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) - os.mkdir(python_file_dir) - python_file_path = os.path.join(python_file_dir, "test_dep1.py") - with open(python_file_path, 'w') as f: - f.write("def add_two(a):\n return a + 2") - - def plus_two_map(value): - from test_dep1 import add_two - return add_two(value) - - get_j_env_configuration(env._j_stream_execution_environment).\ - setString("taskmanager.numberOfTaskSlots", "10") - env.add_python_file(python_file_path) - ds = env.from_collection([1, 2, 3, 4, 5]) - ds = ds.map(plus_two_map, Types.LONG()) \ - .slot_sharing_group("data_stream") \ - .map(lambda i: i, Types.LONG()) \ - .slot_sharing_group("table") - - python_file_path = os.path.join(python_file_dir, "test_dep2.py") - with open(python_file_path, 'w') as f: - f.write("def add_three(a):\n return a + 3") - - def plus_three(value): - from test_dep2 import add_three - return add_three(value) - - t_env = StreamTableEnvironment.create( - stream_execution_environment=env, - environment_settings=EnvironmentSettings.in_streaming_mode()) - env.add_python_file(python_file_path) - - from pyflink.table.udf import udf - from pyflink.table.expressions import col - add_three = udf(plus_three, result_type=DataTypes.BIGINT()) - - tab = t_env.from_data_stream(ds, col('a')) \ - .select(add_three(col('a'))) - t_env.to_data_stream(tab, Types.ROW([Types.LONG()])) \ - .map(lambda i: i[0]) \ - .add_sink(self.test_sink) - env.execute("test add_python_file") - result = self.test_sink.get_results(True) - expected = ['6', '7', '8', '9', '10'] - result.sort() - expected.sort() - self.assertEqual(expected, result) + import uuid + env = self.env + python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) + os.mkdir(python_file_dir) + python_file_path = os.path.join(python_file_dir, "test_dep1.py") + with open(python_file_path, 'w') as f: + f.write("def add_two(a):\n return a + 2") + + def plus_two_map(value): + from test_dep1 import add_two + return add_two(value) + + get_j_env_configuration(env._j_stream_execution_environment).\ + setString("taskmanager.numberOfTaskSlots", "10") + env.add_python_file(python_file_path) + ds = env.from_collection([1, 2, 3, 4, 5]) + ds = ds.map(plus_two_map, Types.LONG()) \ + .slot_sharing_group("data_stream") \ + .map(lambda i: i, Types.LONG()) \ + .slot_sharing_group("table") + + python_file_path = os.path.join(python_file_dir, "test_dep2.py") + with open(python_file_path, 'w') as f: + f.write("def add_three(a):\n return a + 3") + + def plus_three(value): + from test_dep2 import add_three + return add_three(value) + + t_env = StreamTableEnvironment.create( + stream_execution_environment=env, + environment_settings=EnvironmentSettings.in_streaming_mode()) + env.add_python_file(python_file_path) + + from pyflink.table.udf import udf + from pyflink.table.expressions import col + add_three = udf(plus_three, result_type=DataTypes.BIGINT()) + + tab = t_env.from_data_stream(ds, col('a')) \ + .select(add_three(col('a'))) + t_env.to_data_stream(tab, Types.ROW([Types.LONG()])) \ + .map(lambda i: i[0]) \ + .add_sink(self.test_sink) + env.execute("test add_python_file") + result = self.test_sink.get_results(True) + expected = ['6', '7', '8', '9', '10'] + result.sort() + expected.sort() + self.assertEqual(expected, result) def test_add_python_file_2(self): import uuid diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 74c82128d1ea9..c15dcdf9ecc47 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -443,6 +443,23 @@ def test_to_data_stream_local_time(self): '+I[2, 1970-06-07, 08:09:10, 1970-06-07T08:09:10]'] self.assertEqual(expected, results) + def test_to_data_stream(self): + self.env.set_parallelism(1) + t_env = StreamTableEnvironment.create( + self.env, + environment_settings=EnvironmentSettings.in_streaming_mode()) + table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"]) + new_table = table.select(table.a + 1, table.b + 'flink', table.c) + ds = t_env.to_data_stream(table=new_table, type_info=Types.ROW([Types.LONG(), + Types.STRING(), + Types.STRING()])) + test_sink = DataStreamTestSinkFunction() + ds.add_sink(test_sink) + self.env.execute("test_to_data_stream") + result = test_sink.get_results(False) + expected = ['+I[2, Hiflink, Hello]', '+I[3, Helloflink, Hi]'] + self.assertEqual(result, expected) + def test_from_data_stream(self): self.env.set_parallelism(1) From cacce488056bcb80b3afec5a4a7dbf815649e2b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Wed, 15 Jan 2025 22:57:13 +0530 Subject: [PATCH 09/12] renamed method due to duplicate method name --- .../docs/dev/datastream/operators/process_function.md | 2 +- .../docs/dev/python/datastream/intro_to_datastream_api.md | 4 ++-- .../docs/dev/python/datastream/operators/overview.md | 2 +- .../docs/dev/datastream/operators/process_function.md | 2 +- .../docs/dev/python/datastream/intro_to_datastream_api.md | 4 ++-- .../docs/dev/python/datastream/operators/overview.md | 2 +- docs/content/docs/dev/table/data_stream_api.md | 4 ++-- .../docs/reference/pyflink.table/table_environment.rst | 1 + .../datastream/tests/test_stream_execution_environment.py | 2 +- flink-python/pyflink/table/table_environment.py | 2 +- .../pyflink/table/tests/test_table_environment_api.py | 6 +++--- 11 files changed, 16 insertions(+), 15 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index d5e6957439df4..b82696c9b07a6 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -313,7 +313,7 @@ if __name__ == '__main__': ) """) - stream = t_env.to_data_stream( + stream = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md index 6cd24202f74c1..cd3c5b80b4d9c 100644 --- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,7 +227,7 @@ t_env.execute_sql(""" ) """) -ds = t_env.to_data_stream( +ds = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) ``` @@ -266,7 +266,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_data_stream(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream_with_type(table, Types.ROW([Types.INT(), Types.STRING()])) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index 139029ec01306..0344e3f4daaa7 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -108,7 +108,7 @@ def data_stream_api_demo(): ) """) - ds = t_env.to_data_stream( + ds = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md index 7a68276bd69d4..72c9c52572e72 100644 --- a/docs/content/docs/dev/datastream/operators/process_function.md +++ b/docs/content/docs/dev/datastream/operators/process_function.md @@ -320,7 +320,7 @@ if __name__ == '__main__': ) """) - stream = t_env.to_data_stream( + stream = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md index 190523235da6a..c4c3e575d7747 100644 --- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,7 +227,7 @@ t_env.execute_sql(""" ) """) -ds = t_env.to_data_stream( +ds = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) ``` @@ -266,7 +266,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_data_stream(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream_with_type(table, Types.ROW([Types.INT(), Types.STRING()])) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index 6d03bac47d968..2e33deccd7b7d 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -108,7 +108,7 @@ def data_stream_api_demo(): ) """) - ds = t_env.to_data_stream( + ds = t_env.to_data_stream_with_type( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 9459933741a6c..336fbd96ecb9d 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3194,10 +3194,10 @@ table = t_env.from_elements([("john", 35), ("sarah", 32)], DataTypes.FIELD("age", DataTypes.INT())])) # Convert the Table into an append DataStream of Row by specifying the type information -ds_row = t_env.to_data_stream(table, Types.ROW([Types.STRING(), Types.INT()])) +ds_row = t_env.to_data_stream_with_type(table, Types.ROW([Types.STRING(), Types.INT()])) # Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation -ds_tuple = t_env.to_data_stream(table, Types.TUPLE([Types.STRING(), Types.INT()])) +ds_tuple = t_env.to_data_stream_with_type(table, Types.TUPLE([Types.STRING(), Types.INT()])) # Convert the Table into a retract DataStream of Row by specifying the type information # A retract stream of type X is a DataStream of Tuple[bool, X]. diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst b/flink-python/docs/reference/pyflink.table/table_environment.rst index 3ab8a3bf65bda..bbe21e8dc1cbf 100644 --- a/flink-python/docs/reference/pyflink.table/table_environment.rst +++ b/flink-python/docs/reference/pyflink.table/table_environment.rst @@ -251,6 +251,7 @@ StreamTableEnvironment StreamTableEnvironment.sql_query StreamTableEnvironment.to_data_stream StreamTableEnvironment.to_changelog_stream + StreamTableEnvironment.to_data_stream_with_type StreamTableEnvironment.to_retract_stream StreamTableEnvironment.unload_module StreamTableEnvironment.use_catalog diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 81bc16e37933a..c0fb651fdc134 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -301,7 +301,7 @@ def plus_three(value): tab = t_env.from_data_stream(ds, col('a')) \ .select(add_three(col('a'))) - t_env.to_data_stream(tab, Types.ROW([Types.LONG()])) \ + t_env.to_data_stream_with_type(tab, Types.ROW([Types.LONG()])) \ .map(lambda i: i[0]) \ .add_sink(self.test_sink) env.execute("test add_python_file") diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index f9136eced4034..cbc4620066cef 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1784,7 +1784,7 @@ def to_data_stream(self, table: Table) -> DataStream: """ return DataStream(self._j_tenv.toDataStream(table._j_table)) - def to_data_stream(self, table: Table, type_info: TypeInformation) -> DataStream: + def to_data_stream_with_type(self, table: Table, type_info: TypeInformation) -> DataStream: """ Converts the given Table into a DataStream. diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index c15dcdf9ecc47..483f48c8342de 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -443,19 +443,19 @@ def test_to_data_stream_local_time(self): '+I[2, 1970-06-07, 08:09:10, 1970-06-07T08:09:10]'] self.assertEqual(expected, results) - def test_to_data_stream(self): + def test_to_data_stream_with_type(self): self.env.set_parallelism(1) t_env = StreamTableEnvironment.create( self.env, environment_settings=EnvironmentSettings.in_streaming_mode()) table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"]) new_table = table.select(table.a + 1, table.b + 'flink', table.c) - ds = t_env.to_data_stream(table=new_table, type_info=Types.ROW([Types.LONG(), + ds = t_env.to_data_stream_with_type(table=new_table, type_info=Types.ROW([Types.LONG(), Types.STRING(), Types.STRING()])) test_sink = DataStreamTestSinkFunction() ds.add_sink(test_sink) - self.env.execute("test_to_data_stream") + self.env.execute("test_to_data_stream_with_type") result = test_sink.get_results(False) expected = ['+I[2, Hiflink, Hello]', '+I[3, Helloflink, Hi]'] self.assertEqual(result, expected) From 19bc007a51728cec5a81951657146531d97a08a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Thu, 16 Jan 2025 11:42:17 +0530 Subject: [PATCH 10/12] corrected indentation and doc --- docs/content.zh/docs/dev/table/data_stream_api.md | 7 ++++--- docs/content/docs/dev/table/data_stream_api.md | 7 ++++--- .../pyflink/table/tests/test_table_environment_api.py | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index 7c5157b78dac7..eb1873bbdc118 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -3178,11 +3178,12 @@ val table: Table = tableEnv.fromValues( row("sarah", 32)) // Convert the Table into an append DataStream of Row by specifying the class -val dsRow: DataStream[Row] = tableEnv.toDataStream[Row](table) +val dsRow: DataStream[Row] = tableEnv.toDataStream(table) // Convert the Table into an append DataStream of (String, Integer) with TypeInformation -val dsTuple: DataStream[(String, Int)] dsTuple = - tableEnv.toDataStream[(String, Int)](table) +case class User(var name: String, var age: java.lang.Integer) +val dsTuple: DataStream[User] dsTuple = + tableEnv.toDataStream(table, DataTypes.of(User.class)) // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 336fbd96ecb9d..4eff2932063d8 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3169,11 +3169,12 @@ val table: Table = tableEnv.fromValues( row("sarah", 32)) // Convert the Table into an append DataStream of Row by specifying the class -val dsRow: DataStream[Row] = tableEnv.toDataStream[Row](table) +val dsRow: DataStream[Row] = tableEnv.toDataStream(table) // Convert the Table into an append DataStream of (String, Integer) with TypeInformation -val dsTuple: DataStream[(String, Int)] dsTuple = - tableEnv.toDataStream[(String, Int)](table) +case class User(var name: String, var age: java.lang.Integer) +val dsTuple: DataStream[User] dsTuple = + tableEnv.toDataStream(table, DataTypes.of(User.class)) // Convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream>. diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 483f48c8342de..1801cb3235a7d 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -451,8 +451,8 @@ def test_to_data_stream_with_type(self): table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"]) new_table = table.select(table.a + 1, table.b + 'flink', table.c) ds = t_env.to_data_stream_with_type(table=new_table, type_info=Types.ROW([Types.LONG(), - Types.STRING(), - Types.STRING()])) + Types.STRING(), + Types.STRING()])) test_sink = DataStreamTestSinkFunction() ds.add_sink(test_sink) self.env.execute("test_to_data_stream_with_type") From 8b39adebe6d68df1561cdaff2995889d152cc30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Fri, 17 Jan 2025 10:33:18 +0530 Subject: [PATCH 11/12] corrected test failure --- flink-python/pyflink/table/table_environment.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index fb90d789ccc4b..19e8daef88a45 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1816,7 +1816,8 @@ def to_data_stream_with_type(self, table: Table, type_info: TypeInformation) -> :param type_info: The TypeInformation that specifies the type of the DataStream. :return: The converted DataStream. """ - return DataStream(self._j_tenv.toDataStream(table._j_table, type_info.get_java_type_info())) + j_data_stream = self._j_tenv.toDataStream(table._j_table, type_info.get_java_type_info()) + return DataStream(j_data_stream=j_data_stream) def to_changelog_stream(self, table: Table, From c1900ad5a0acf5ed663f3e536a7bfe267711ef84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cshalini-m20=E2=80=9D?= <“shalini.m20@ibm.com”> Date: Sat, 18 Jan 2025 15:23:15 +0530 Subject: [PATCH 12/12] correcte test failures --- .../datastream/operators/process_function.md | 5 ++-- .../datastream/intro_to_datastream_api.md | 7 +++--- .../python/datastream/operators/overview.md | 5 ++-- .../datastream/operators/process_function.md | 5 ++-- .../datastream/intro_to_datastream_api.md | 7 +++--- .../python/datastream/operators/overview.md | 5 ++-- .../content/docs/dev/table/data_stream_api.md | 4 ++-- .../pyflink.table/table_environment.rst | 1 - .../test_stream_execution_environment.py | 2 +- .../pyflink/table/table_environment.py | 24 ------------------- .../table/tests/test_table_environment_api.py | 17 ------------- 11 files changed, 17 insertions(+), 65 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index b82696c9b07a6..69fe2559f0ee3 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -313,9 +313,8 @@ if __name__ == '__main__': ) """) - stream = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) + stream = t_env.to_data_stream( + t_env.from_path('my_source')) watermarked_stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md index cd3c5b80b4d9c..e72fcb313435e 100644 --- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,9 +227,8 @@ t_env.execute_sql(""" ) """) -ds = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream( + t_env.from_path('my_source')) ``` Note The StreamExecutionEnvironment `env` should be specified @@ -266,7 +265,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_data_stream_with_type(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream(table) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index 0344e3f4daaa7..107140088f705 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -108,9 +108,8 @@ def data_stream_api_demo(): ) """) - ds = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) + ds = t_env.to_data_stream( + t_env.from_path('my_source')) def split(s): splits = s[1].split("|") diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md index 72c9c52572e72..b4bf9d8e3f14f 100644 --- a/docs/content/docs/dev/datastream/operators/process_function.md +++ b/docs/content/docs/dev/datastream/operators/process_function.md @@ -320,9 +320,8 @@ if __name__ == '__main__': ) """) - stream = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) + stream = t_env.to_data_stream( + t_env.from_path('my_source')) watermarked_stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md index c4c3e575d7747..038765c0120b0 100644 --- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md +++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md @@ -227,9 +227,8 @@ t_env.execute_sql(""" ) """) -ds = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream( + t_env.from_path('my_source')) ``` Note The StreamExecutionEnvironment `env` should be specified @@ -266,7 +265,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse. table = t_env.from_data_stream(ds, 'a, b, c') # convert a Table to a DataStream -ds = t_env.to_data_stream_with_type(table, Types.ROW([Types.INT(), Types.STRING()])) +ds = t_env.to_data_stream(table) # or ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()])) ``` diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index 2e33deccd7b7d..853b74a47b439 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -108,9 +108,8 @@ def data_stream_api_demo(): ) """) - ds = t_env.to_data_stream_with_type( - t_env.from_path('my_source'), - Types.ROW([Types.INT(), Types.STRING()])) + ds = t_env.to_data_stream( + t_env.from_path('my_source')) def split(s): splits = s[1].split("|") diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 4eff2932063d8..0d1d364f319c4 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -3195,10 +3195,10 @@ table = t_env.from_elements([("john", 35), ("sarah", 32)], DataTypes.FIELD("age", DataTypes.INT())])) # Convert the Table into an append DataStream of Row by specifying the type information -ds_row = t_env.to_data_stream_with_type(table, Types.ROW([Types.STRING(), Types.INT()])) +ds_row = t_env.to_data_stream(table) # Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation -ds_tuple = t_env.to_data_stream_with_type(table, Types.TUPLE([Types.STRING(), Types.INT()])) +ds_tuple = t_env.to_data_stream(table) # Convert the Table into a retract DataStream of Row by specifying the type information # A retract stream of type X is a DataStream of Tuple[bool, X]. diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst b/flink-python/docs/reference/pyflink.table/table_environment.rst index 6e0601b476c4c..3945de781e382 100644 --- a/flink-python/docs/reference/pyflink.table/table_environment.rst +++ b/flink-python/docs/reference/pyflink.table/table_environment.rst @@ -253,7 +253,6 @@ StreamTableEnvironment StreamTableEnvironment.sql_query StreamTableEnvironment.to_data_stream StreamTableEnvironment.to_changelog_stream - StreamTableEnvironment.to_data_stream_with_type StreamTableEnvironment.to_retract_stream StreamTableEnvironment.unload_module StreamTableEnvironment.use_catalog diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index c0fb651fdc134..64b74d78d7d02 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -301,7 +301,7 @@ def plus_three(value): tab = t_env.from_data_stream(ds, col('a')) \ .select(add_three(col('a'))) - t_env.to_data_stream_with_type(tab, Types.ROW([Types.LONG()])) \ + t_env.to_data_stream(tab) \ .map(lambda i: i[0]) \ .add_sink(self.test_sink) env.execute("test add_python_file") diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 19e8daef88a45..2c2c74b6f9727 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1795,30 +1795,6 @@ def to_data_stream(self, table: Table) -> DataStream: """ return DataStream(self._j_tenv.toDataStream(table._j_table)) - def to_data_stream_with_type(self, table: Table, type_info: TypeInformation) -> DataStream: - """ - Converts the given Table into a DataStream. - - Since the DataStream API does not support changelog processing natively, this method - assumes append-only/insert-only semantics during the table-to-stream conversion. The records - of class Row will always describe RowKind#INSERT changes. Updating tables are - not supported by this method and will produce an exception. - - Note that the type system of the table ecosystem is richer than the one of the DataStream - API. The table runtime will make sure to properly serialize the output records to the first - operator of the DataStream API. Afterwards, the Types semantics of the DataStream API - need to be considered. - - If the input table contains a single rowtime column, it will be propagated into a stream - record's timestamp. Watermarks will be propagated as well. - - :param table: The Table to convert. - :param type_info: The TypeInformation that specifies the type of the DataStream. - :return: The converted DataStream. - """ - j_data_stream = self._j_tenv.toDataStream(table._j_table, type_info.get_java_type_info()) - return DataStream(j_data_stream=j_data_stream) - def to_changelog_stream(self, table: Table, target_schema: Schema = None, diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 9d3ba09c7efe3..5363e4d53d85f 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -443,23 +443,6 @@ def test_to_data_stream_local_time(self): '+I[2, 1970-06-07, 08:09:10, 1970-06-07T08:09:10]'] self.assertEqual(expected, results) - def test_to_data_stream_with_type(self): - self.env.set_parallelism(1) - t_env = StreamTableEnvironment.create( - self.env, - environment_settings=EnvironmentSettings.in_streaming_mode()) - table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"]) - new_table = table.select(table.a + 1, table.b + 'flink', table.c) - ds = t_env.to_data_stream_with_type(table=new_table, type_info=Types.ROW([Types.LONG(), - Types.STRING(), - Types.STRING()])) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) - self.env.execute("test_to_data_stream_with_type") - result = test_sink.get_results(False) - expected = ['+I[2, Hiflink, Hello]', '+I[3, Helloflink, Hi]'] - self.assertEqual(result, expected) - def test_from_data_stream(self): self.env.set_parallelism(1)