Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4a70d1f
[FLINK-36488] [TABLE SQL/API] Remove deprecated methods StreamTableEn…
Oct 16, 2024
6da38a6
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 11, 2025
3676175
removing python test case having dependency on the removed deprecated…
Jan 12, 2025
8c869c8
removed toAppendStream from Scala StreamTableEnvironment and python t…
Jan 13, 2025
30eed1d
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 13, 2025
7cfb0ba
removed toAppendStream method reference from WindowAggregateITCase
Jan 13, 2025
721188e
cleanup for toAppendStream and to_append_stream from docs and fixed r…
Jan 14, 2025
fb0b5d8
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 14, 2025
03443a1
fixed review comments
Jan 14, 2025
24bc5fd
Fixed review comments and cleanup for deprecated method
Jan 15, 2025
ed78575
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 15, 2025
6a56e17
cleanup for deprecated method
Jan 15, 2025
5c5886c
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 15, 2025
cacce48
renamed method due to duplicate method name
Jan 15, 2025
780bae6
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 16, 2025
6c7a6b4
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 16, 2025
19bc007
corrected indentation and doc
Jan 16, 2025
62ea99b
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 17, 2025
8b39ade
corrected test failure
Jan 17, 2025
f6eac34
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 18, 2025
c1900ad
correcte test failures
Jan 18, 2025
ad21377
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 27, 2025
31b7780
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Jan 28, 2025
3250f79
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 10, 2025
fb478df
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 11, 2025
d13bd85
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 13, 2025
e7e0bff
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 13, 2025
1a8f489
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 20, 2025
2831f98
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Feb 21, 2025
448ef27
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 10, 2025
d776e32
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 10, 2025
b54b518
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 17, 2025
7a6ecaf
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 17, 2025
42d6f04
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 19, 2025
5e0c2a2
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 19, 2025
2fe8376
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 19, 2025
104d741
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 21, 2025
06431ec
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 24, 2025
f82ce3b
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 25, 2025
1f5cc63
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 28, 2025
fcad748
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Mar 31, 2025
c5ed33e
Merge branch 'apache:master' into 36488-remove-deprecated-methods
sn-12-3 Apr 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,8 @@ if __name__ == '__main__':
)
""")

stream = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
stream = t_env.to_data_stream(
t_env.from_path('my_source'))
watermarked_stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(MyTimestampAssigner()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ t_env.execute_sql("""
)
""")

ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(
t_env.from_path('my_source'))
```

<span class="label label-info">Note</span> The StreamExecutionEnvironment `env` should be specified
Expand Down Expand Up @@ -266,7 +265,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse.
table = t_env.from_data_stream(ds, 'a, b, c')

# convert a Table to a DataStream
ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(table)
# or
ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ def data_stream_api_demo():
)
""")

ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(
t_env.from_path('my_source'))

def split(s):
splits = s[1].split("|")
Expand Down
11 changes: 6 additions & 5 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3151,11 +3151,11 @@ Table table = tableEnv.fromValues(
row("sarah", 32));

// Convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
DataStream<Row> dsRow = tableEnv.toDataStream(table, Row.class);

// Convert the Table into an append DataStream of Tuple2<String, Integer> with TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toDataStream(table, tupleType);

// Convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
Expand All @@ -3178,11 +3178,12 @@ val table: Table = tableEnv.fromValues(
row("sarah", 32))

// Convert the Table into an append DataStream of Row by specifying the class
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
val dsRow: DataStream[Row] = tableEnv.toDataStream(table)

// Convert the Table into an append DataStream of (String, Integer) with TypeInformation
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
case class User(var name: String, var age: java.lang.Integer)
val dsTuple: DataStream[User] dsTuple =
tableEnv.toDataStream(table, DataTypes.of(User.class))

// Convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,8 @@ if __name__ == '__main__':
)
""")

stream = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
stream = t_env.to_data_stream(
t_env.from_path('my_source'))
watermarked_stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(MyTimestampAssigner()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ t_env.execute_sql("""
)
""")

ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(
t_env.from_path('my_source'))
```

<span class="label label-info">Note</span> The StreamExecutionEnvironment `env` should be specified
Expand Down Expand Up @@ -266,7 +265,7 @@ It also supports to convert a `DataStream` to a `Table` and vice verse.
table = t_env.from_data_stream(ds, 'a, b, c')

# convert a Table to a DataStream
ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(table)
# or
ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
```
Expand Down
5 changes: 2 additions & 3 deletions docs/content/docs/dev/python/datastream/operators/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ def data_stream_api_demo():
)
""")

ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
ds = t_env.to_data_stream(
t_env.from_path('my_source'))

def split(s):
splits = s[1].split("|")
Expand Down
15 changes: 8 additions & 7 deletions docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3142,11 +3142,11 @@ Table table = tableEnv.fromValues(
row("sarah", 32));

// Convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
DataStream<Row> dsRow = tableEnv.toDataStream(table, Row.class);

// Convert the Table into an append DataStream of Tuple2<String, Integer> with TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toDataStream(table, tupleType);

// Convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
Expand All @@ -3169,11 +3169,12 @@ val table: Table = tableEnv.fromValues(
row("sarah", 32))

// Convert the Table into an append DataStream of Row by specifying the class
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
val dsRow: DataStream[Row] = tableEnv.toDataStream(table)

// Convert the Table into an append DataStream of (String, Integer) with TypeInformation
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
case class User(var name: String, var age: java.lang.Integer)
val dsTuple: DataStream[User] dsTuple =
tableEnv.toDataStream(table, DataTypes.of(User.class))

// Convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
Expand All @@ -3194,10 +3195,10 @@ table = t_env.from_elements([("john", 35), ("sarah", 32)],
DataTypes.FIELD("age", DataTypes.INT())]))

# Convert the Table into an append DataStream of Row by specifying the type information
ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), Types.INT()]))
ds_row = t_env.to_data_stream(table)

# Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation
ds_tuple = t_env.to_append_stream(table, Types.TUPLE([Types.STRING(), Types.INT()]))
ds_tuple = t_env.to_data_stream(table)

# Convert the Table into a retract DataStream of Row by specifying the type information
# A retract stream of type X is a DataStream of Tuple[bool, X].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ StreamTableEnvironment
StreamTableEnvironment.sql_query
StreamTableEnvironment.to_data_stream
StreamTableEnvironment.to_changelog_stream
StreamTableEnvironment.to_append_stream
StreamTableEnvironment.to_retract_stream
StreamTableEnvironment.unload_module
StreamTableEnvironment.use_catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def plus_three(value):

tab = t_env.from_data_stream(ds, col('a')) \
.select(add_three(col('a')))
t_env.to_append_stream(tab, Types.ROW([Types.LONG()])) \
t_env.to_data_stream(tab) \
.map(lambda i: i[0]) \
.add_sink(self.test_sink)
env.execute("test add_python_file")
Expand Down
18 changes: 0 additions & 18 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1952,24 +1952,6 @@ def to_changelog_stream(self,
target_schema._j_schema,
changelog_mode._j_changelog_mode))

def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream:
"""
Converts the given Table into a DataStream of a specified type. The Table must only have
insert (append) changes. If the Table is also modified by update or delete changes, the
conversion will fail.

The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are
mapped by position, field types must match.

:param table: The Table to convert.
:param type_info: The TypeInformation that specifies the type of the DataStream.
:return: The converted DataStream.

.. versionadded:: 1.12.0
"""
j_data_stream = self._j_tenv.toAppendStream(table._j_table, type_info.get_java_type_info())
return DataStream(j_data_stream=j_data_stream)

def to_retract_stream(self, table: Table, type_info: TypeInformation) -> DataStream:
"""
Converts the given Table into a DataStream of add and retract messages. The message will be
Expand Down
17 changes: 0 additions & 17 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,23 +600,6 @@ def test_from_and_to_changelog_stream_event_time(self):
actual_results.sort()
self.assertEqual(expected_results, actual_results)

def test_to_append_stream(self):
self.env.set_parallelism(1)
t_env = StreamTableEnvironment.create(
self.env,
environment_settings=EnvironmentSettings.in_streaming_mode())
table = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hi")], ["a", "b", "c"])
new_table = table.select(table.a + 1, table.b + 'flink', table.c)
ds = t_env.to_append_stream(table=new_table, type_info=Types.ROW([Types.LONG(),
Types.STRING(),
Types.STRING()]))
test_sink = DataStreamTestSinkFunction()
ds.add_sink(test_sink)
self.env.execute("test_to_append_stream")
result = test_sink.get_results(False)
expected = ['+I[2, Hiflink, Hello]', '+I[3, Helloflink, Hi]']
self.assertEqual(result, expected)

def test_to_retract_stream(self):
self.env.set_parallelism(1)
t_env = StreamTableEnvironment.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,61 +763,6 @@ DataStream<Row> toChangelogStream(
@Deprecated
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);

/**
* Converts the given {@link Table} into an append {@link DataStream} of a specified type.
*
* <p>The {@link Table} must only have insert (append) changes. If the {@link Table} is also
* modified by update or delete changes, the conversion will fail.
*
* <p>The fields of the {@link Table} are mapped to {@link DataStream} fields as follows:
*
* <ul>
* <li>{@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped
* by position, field types must match.
* <li>POJO {@link DataStream} types: Fields are mapped by field name, field types must match.
* </ul>
*
* @param table The {@link Table} to convert.
* @param clazz The class of the type of the resulting {@link DataStream}.
* @param <T> The type of the resulting {@link DataStream}.
* @return The converted {@link DataStream}.
* @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type
* system and supports all kinds of {@link DataTypes} that the table runtime can produce.
* The semantics might be slightly different for raw and structured types. Use {@code
* toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should
* be used as source of truth.
*/
@Deprecated
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

/**
* Converts the given {@link Table} into an append {@link DataStream} of a specified type.
*
* <p>The {@link Table} must only have insert (append) changes. If the {@link Table} is also
* modified by update or delete changes, the conversion will fail.
*
* <p>The fields of the {@link Table} are mapped to {@link DataStream} fields as follows:
*
* <ul>
* <li>{@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped
* by position, field types must match.
* <li>POJO {@link DataStream} types: Fields are mapped by field name, field types must match.
* </ul>
*
* @param table The {@link Table} to convert.
* @param typeInfo The {@link TypeInformation} that specifies the type of the {@link
* DataStream}.
* @param <T> The type of the resulting {@link DataStream}.
* @return The converted {@link DataStream}.
* @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type
* system and supports all kinds of {@link DataTypes} that the table runtime can produce.
* The semantics might be slightly different for raw and structured types. Use {@code
* toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should
* be used as source of truth.
*/
@Deprecated
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

/**
* Converts the given {@link Table} into a {@link DataStream} of add and retract messages. The
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed the comment for toRetractStream but kept the comment for toAppendStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Apologies, corrected now.

* message will be encoded as {@link Tuple2}. The first field is a {@link Boolean} flag, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.MutableURLClassLoader;
Expand Down Expand Up @@ -316,22 +315,6 @@ public <T> void createTemporaryView(
createTemporaryView(path, fromDataStream(dataStream, fields));
}

@Override
public <T> DataStream<T> toAppendStream(Table table, Class<T> clazz) {
TypeInformation<T> typeInfo = extractTypeInformation(table, clazz);
return toAppendStream(table, typeInfo);
}

@Override
public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo) {
OutputConversionModifyOperation modifyOperation =
new OutputConversionModifyOperation(
table.getQueryOperation(),
TypeConversions.fromLegacyInfoToDataType(typeInfo),
OutputConversionModifyOperation.UpdateMode.APPEND);
return toStreamInternal(table, modifyOperation);
}

@Override
public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz) {
TypeInformation<T> typeInfo = extractTypeInformation(table, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,32 +721,6 @@ trait StreamTableEnvironment extends TableEnvironment {
@deprecated
def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified by
* update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[Row]] and Scala Tuple types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @param table
* The [[Table]] to convert.
* @tparam T
* The type of the resulting [[DataStream]].
* @return
* The converted [[DataStream]].
* @deprecated
* Use [[toDataStream(Table, Class)]] instead. It integrates with the new type system and
* supports all kinds of [[DataTypes]] that the table runtime can produce. The semantics might
* be slightly different for raw and structured types. Use
* `toDataStream(DataTypes.of(Types.of[Class]))` if [[TypeInformation]] should be used as source
* of truth.
*/
@deprecated
def toAppendStream[T: TypeInformation](table: Table): DataStream[T]

/**
* Converts the given [[Table]] into a [[DataStream]] of add and retract messages. The message
* will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second field holds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,40 +189,6 @@ class TableConversions(table: Table) {
// Legacy before FLIP-136
// ----------------------------------------------------------------------------------------------

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified by
* update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @tparam T
* The type of the resulting [[DataStream]].
* @return
* The converted [[DataStream]].
* @deprecated
* Use [[toDataStream(Table, Class)]] instead. It integrates with the new type system and
* supports all kinds of [[DataTypes]] that the table runtime can produce. The semantics might
* be slightly different for raw and structured types. Use
* `toDataStream(DataTypes.of(Types.of[Class]))` if [[TypeInformation]] should be used as source
* of truth.
*/
@deprecated
def toAppendStream[T: TypeInformation]: DataStream[T] = {
internalEnv match {
case tEnv: StreamTableEnvironment =>
tEnv.toAppendStream(table)
case _ =>
throw new ValidationException(
"Only tables that originate from Scala DataStreams " +
"can be converted to Scala DataStreams.")
}
}

/**
* Converts the [[Table]] to a [[DataStream]] of add and retract messages. The message will be
* encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second field holds the record
Expand Down
Loading