Skip to content

Commit

Permalink
[FLINK-33718][table] Cleanup the usage of deprecated StreamTableEnvir…
Browse files Browse the repository at this point in the history
…onment#toAppendStream
  • Loading branch information
snuyanzin committed Dec 7, 2023
1 parent fac3ac7 commit a3ee787
Show file tree
Hide file tree
Showing 55 changed files with 583 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,18 @@ private void testStreamingWrite(
Types.STRING,
Types.STRING,
Types.STRING));
/*tEnv.createTemporaryView(
"my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));*/
tEnv.createTemporaryView(
"my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));

"my_table",
stream,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.STRING())
.column("f3", DataTypes.STRING())
.column("f4", DataTypes.STRING())
.build());
// DDL
tEnv.executeSql(
"create external table sink_table (a int,b string,c string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -146,7 +147,11 @@ public static void main(String[] args) throws Exception {
Table result = tEnv.sqlQuery(finalAgg);
// convert Table into append-only DataStream
DataStream<Row> resultStream =
tEnv.toAppendStream(result, Types.ROW(Types.INT, Types.SQL_TIMESTAMP));
tEnv.toDataStream(
result,
DataTypes.ROW(
DataTypes.INT(),
DataTypes.TIMESTAMP().bridgedTo(java.sql.Timestamp.class)));

final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public static RowData validateRow(RowData rowData, RowType rowType) throws Excep
Table table = tableEnv.fromDataStream(rows);
tableEnv.createTemporaryView("t", table);
table = tableEnv.sqlQuery("select * from t");
List<RowData> resultRows =
tableEnv.toAppendStream(table, InternalTypeInfo.of(rowType)).executeAndCollect(1);
return resultRows.get(0);
List<Object> resultRows =
tableEnv.toDataStream(table, InternalTypeInfo.of(rowType).getDataType())
.executeAndCollect(1);
return (RowData) resultRows.get(0);
}

public static byte[] rowToPbBytes(RowData row, Class messageClass) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.runtime.operators.python.scalar;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand All @@ -33,6 +32,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.python.PythonEnv;
Expand Down Expand Up @@ -224,7 +224,7 @@ public void testPythonScalarFunctionOperatorIsChainedByDefault() {
DataStream<Tuple2<Integer, Integer>> ds = env.fromData(new Tuple2<>(1, 2));
Table t = tEnv.fromDataStream(ds, $("a"), $("b")).select(call("pyFunc", $("a"), $("b")));
// force generating the physical plan for the given table
tEnv.toAppendStream(t, BasicTypeInfo.INT_TYPE_INFO);
tEnv.toDataStream(t, DataTypes.INT());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertThat(vertices).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void testAppendStreamDoesNotOverwriteTableConfig() {
Duration minRetention = Duration.ofMinutes(1);
tEnv.getConfig().setIdleStateRetention(minRetention);
Table table = tEnv.fromDataStream(elements);
tEnv.toAppendStream(table, Row.class);
tEnv.toDataStream(table);

assertThat(tEnv.getConfig().getIdleStateRetention()).isEqualTo(minRetention);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class StreamTableEnvironmentImplTest {
val retention = Duration.ofMinutes(1)
tEnv.getConfig.setIdleStateRetention(retention)
val table = tEnv.fromDataStream(elements)
tEnv.toAppendStream[Row](table)
tEnv.toDataStream(table)

assertThat(tEnv.getConfig.getMinIdleStateRetentionTime).isEqualTo(retention.toMillis)
assertThat(tEnv.getConfig.getMaxIdleStateRetentionTime).isEqualTo(retention.toMillis * 3 / 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.listener.CatalogListener1;
import org.apache.flink.table.catalog.listener.CatalogListener2;
import org.apache.flink.types.Row;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -66,7 +65,7 @@ void testPassingExecutionParameters() {

// trigger translation
Table table = tEnv.sqlQuery("SELECT * FROM test");
tEnv.toAppendStream(table, Row.class);
tEnv.toDataStream(table);

assertThat(env.getParallelism()).isEqualTo(128);
assertThat(env.getConfig().getAutoWatermarkInterval()).isEqualTo(800);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
import org.junit.jupiter.api.TestTemplate
import org.junit.jupiter.api.extension.ExtendWith

import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
import java.time.{Duration, Instant, LocalDateTime, ZoneId, ZoneOffset}

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class TimeAttributesITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
Expand All @@ -47,11 +47,13 @@ class TimeAttributesITCase(mode: StateBackendMode) extends StreamingWithStateTes

def cancel() {}
})
tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
tEnv.createTemporaryView("test", stream, Schema.newBuilder()
.columnByMetadata("event_time", DataTypes.TIMESTAMP(3), "rowtime", true)
.build())
val result = tEnv.sqlQuery("SELECT * FROM test")

val sink = new TestingAppendSink()
tEnv.toAppendStream[Row](result).addSink(sink)
tEnv.toDataStream(result).addSink(sink)

assertThatThrownBy(() => env.execute())
.hasMessageNotContaining("Rowtime timestamp is not defined. Please make sure that a " +
Expand All @@ -73,18 +75,20 @@ class TimeAttributesITCase(mode: StateBackendMode) extends StreamingWithStateTes
}
)

tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
tEnv.createTemporaryView("test", stream, Schema.newBuilder()
.columnByMetadata("event_time", DataTypes.TIMESTAMP(3), "rowtime", true)
.build())
val result = tEnv.sqlQuery("SELECT * FROM test")

val sink = new TestingAppendSink()
tEnv.toAppendStream[Row](result).addSink(sink)
tEnv.toDataStream(result).addSink(sink)
env.execute()

val formattedData = data.map {
case (timestamp, data) =>
val formattedTimestamp =
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC).toString
s"$formattedTimestamp,$data"
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).toString
s"$timestamp,$data,$formattedTimestamp"
}
assertThat(formattedData.sorted).isEqualTo(sink.getAppendResults.sorted)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) {
checkEmptyFile(sink1Path)

val table = streamTableEnv.sqlQuery("select last from MyTable where id > 0")
val resultSet = streamTableEnv.toAppendStream(table, classOf[Row])
val resultSet = streamTableEnv.toDataStream(table)
val sink = new TestingAppendSink
resultSet.addSink(sink)

Expand Down Expand Up @@ -236,7 +236,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) {
checkEmptyFile(sink1Path)

val table = streamTableEnv.sqlQuery("select last from MyTable where id > 0")
val resultSet = streamTableEnv.toAppendStream(table, classOf[Row])
val resultSet = streamTableEnv.toDataStream(table)
val sink = new TestingAppendSink
resultSet.addSink(sink)

Expand Down Expand Up @@ -281,7 +281,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) {
checkEmptyFile(sink1Path)

val table = streamTableEnv.sqlQuery("select last from MyTable where id > 0")
val resultSet = streamTableEnv.toAppendStream[Row](table)
val resultSet = streamTableEnv.toDataStream(table)
val sink = new TestingAppendSink
resultSet.addSink(sink)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase {
"from T1"

assertThatExceptionOfType(classOf[TableException])
.isThrownBy(() => streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row])
.isThrownBy(() => streamUtil.tableEnv.sqlQuery(sqlQuery).toDataStream)
}

/** OVER clause is necessary for [[OverAgg0]] window function. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ class AggregateValidationTest extends TableTestBase {
sql: String,
keywords: String,
clazz: Class[_ <: Throwable] = classOf[ValidationException]): Unit = {
val callable: ThrowingCallable = () =>
util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql))
val callable: ThrowingCallable = () => util.tableEnv.toDataStream(util.tableEnv.sqlQuery(sql))
if (keywords != null) {
assertThatExceptionOfType(clazz)
.isThrownBy(callable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class LegacyTableSinkValidationTest extends TableTestBase {
() => {
t.groupBy('text)
.select('text, 'id.count, 'num.sum)
.toAppendStream[Row]
.toDataStream
.addSink(new TestingAppendSink)
env.execute()
})
Expand Down Expand Up @@ -93,7 +93,7 @@ class LegacyTableSinkValidationTest extends TableTestBase {
ds1
.leftOuterJoin(ds2, 'a === 'd && 'b === 'h)
.select('c, 'g)
.toAppendStream[Row]
.toDataStream
.addSink(new TestingAppendSink)
env.execute()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SetOperatorsValidationTest extends TableTestBase {
.isThrownBy(
() => {
val unionDs = ds1.unionAll(ds2)
unionDs.toAppendStream[Row].addSink(sink)
unionDs.toDataStream.addSink(sink)
env.execute()
})
assertThat(sink.getAppendResults.isEmpty).isTrue
Expand All @@ -67,7 +67,7 @@ class SetOperatorsValidationTest extends TableTestBase {
.isThrownBy(
() => {
val unionDs = ds1.unionAll(ds2)
unionDs.toAppendStream[Row].addSink(sink)
unionDs.toDataStream.addSink(sink)
env.execute()
})
assertThat(sink.getAppendResults.isEmpty).isTrue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class MyPojo() {
}

class NonPojo {
val x = new java.util.HashMap[String, String]()
var x = new java.util.HashMap[String, String]()

override def toString: String = x.toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
val t1 = tEnv.sqlQuery(sql)

tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val outputType = Array(
DataTypes.BIGINT().getLogicalType,
DataTypes.STRING().getLogicalType,
Expand Down Expand Up @@ -174,7 +174,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
""".stripMargin
val t1 = tEnv.sqlQuery(sql)

val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down Expand Up @@ -277,7 +277,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
val t1 = tEnv.sqlQuery(sql)

tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down Expand Up @@ -367,7 +367,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
""".stripMargin
val t1 = tEnv.sqlQuery(sql)

val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down Expand Up @@ -465,7 +465,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
val t1 = tEnv.sqlQuery(sql)

tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down Expand Up @@ -592,7 +592,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
val t1 = tEnv.sqlQuery(sql)

tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down Expand Up @@ -714,7 +714,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
val t1 = tEnv.sqlQuery(sql)

tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "OverAggregate")
val assertor = new RowDataHarnessAssertor(
Array(
DataTypes.BIGINT().getLogicalType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
|GROUP BY `name`, window_start, window_end
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "WindowAggregate")
// window aggregate put window properties at the end of aggs
val outputTypes =
Array(
Expand Down Expand Up @@ -194,7 +194,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
|GROUP BY `name`, window_start, window_end
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "WindowAggregate")
// window aggregate put window properties at the end of aggs
val assertor = new RowDataHarnessAssertor(
Array(
Expand Down Expand Up @@ -328,7 +328,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
|GROUP BY `name`, window_start, window_end
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "WindowAggregate")
// window aggregate put window properties at the end of aggs
val assertor = new RowDataHarnessAssertor(
Array(
Expand Down Expand Up @@ -509,7 +509,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
|GROUP BY `name`, window_start, window_end
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val stream: DataStream[Row] = t1.toAppendStream[Row]
val stream: DataStream[Row] = t1.toDataStream

val testHarness = createHarnessTesterForNoState(stream, "LocalWindowAggregate")
// window aggregate put window properties at the end of aggs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class WindowAggregateUseDaylightTimeHarnessTest(backend: StateBackendMode, timeZ
|GROUP BY `name`, window_start, window_end
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
val testHarness = createHarnessTester(t1.toDataStream, "WindowAggregate")
// window aggregate put window properties at the end of aggs
val assertor = new RowDataHarnessAssertor(
Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class WindowTableFunctionHarnessTest(backend: StateBackendMode, shiftTimeZone: Z
|FROM TABLE(TUMBLE(TABLE T1, DESCRIPTOR(proctime), INTERVAL '5' SECOND))
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTesterForNoState(t1.toAppendStream[Row], "WindowTableFunction")
val testHarness =
createHarnessTesterForNoState(t1.toDataStream, "WindowTableFunction")

testHarness.open()
ingestData(testHarness)
Expand Down Expand Up @@ -207,7 +208,8 @@ class WindowTableFunctionHarnessTest(backend: StateBackendMode, shiftTimeZone: Z
| HOP(TABLE T1, DESCRIPTOR(proctime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTesterForNoState(t1.toAppendStream[Row], "WindowTableFunction")
val testHarness =
createHarnessTesterForNoState(t1.toDataStream, "WindowTableFunction")

testHarness.open()
ingestData(testHarness)
Expand Down Expand Up @@ -424,7 +426,8 @@ class WindowTableFunctionHarnessTest(backend: StateBackendMode, shiftTimeZone: Z
| CUMULATE(TABLE T1, DESCRIPTOR(proctime), INTERVAL '5' SECOND, INTERVAL '15' SECOND))
""".stripMargin
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTesterForNoState(t1.toAppendStream[Row], "WindowTableFunction")
val testHarness =
createHarnessTesterForNoState(t1.toDataStream, "WindowTableFunction")

testHarness.open()
ingestData(testHarness)
Expand Down
Loading

0 comments on commit a3ee787

Please sign in to comment.