Skip to content

Commit

Permalink
[FLINK-33646][table] Cleanup the rest usage of deprecated TableEnviro…
Browse files Browse the repository at this point in the history
…nment#registerFunction and in docs
  • Loading branch information
snuyanzin committed Nov 25, 2023
1 parent 2613428 commit d12d49e
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 85 deletions.
8 changes: 4 additions & 4 deletions docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum

// 注册函数
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
tEnv.createTemporarySystemFunction("wAvg", WeightedAvg.class);

// 使用函数
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
Expand Down Expand Up @@ -1355,7 +1355,7 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {

// 注册函数
val tEnv: StreamTableEnvironment = ???
tEnv.registerFunction("wAvg", new WeightedAvg())
tEnv.createTemporarySystemFunction("wAvg", WeightedAvg.class)

// 使用函数
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
Expand Down Expand Up @@ -1797,7 +1797,7 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>

// 注册函数
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());
tEnv.createTemporarySystemFunction("top2", Top2.class);

// 初始化表
Table tab = ...;
Expand Down Expand Up @@ -1940,7 +1940,7 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>

// 注册函数
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());
tEnv.createTemporarySystemFunction("top2", Top2.class);

// 初始化表
Table tab = ...;
Expand Down
63 changes: 33 additions & 30 deletions docs/content.zh/docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ result = orders.over_window(Over
Table orders = tEnv.from("Orders");

// 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合
tEnv.registerFunction("myUdagg", new MyUdagg());
tEnv.createTemporarySystemFunction("myUdagg", MyUdagg.class);
orders.groupBy("users")
.select(
$("users"),
Expand Down Expand Up @@ -1026,8 +1026,7 @@ join 表和表函数的结果。左(外部)表的每一行都会 join 表函
{{< tab "Java" >}}
```java
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);

// join
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -1075,8 +1074,7 @@ join 表和表函数的结果。左(外部)表的每一行都会 join 表函
{{< tab "Java" >}}
```java
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);

// join
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -1128,7 +1126,7 @@ Table ratesHistory = tableEnv.from("RatesHistory");
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
"r_proctime",
"r_currency");
tableEnv.registerFunction("rates", rates);
tableEnv.createTemporarySystemFunction("rates", rates);

// 基于时间属性和键与“Orders”表关联
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -2175,19 +2173,14 @@ table = input.over_window([w: OverWindow].alias("w"))
使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。

```java
@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW<s1 STRING, s2 STRING>"))
public class MyMapFunction extends ScalarFunction {
public Row eval(String a) {
return Row.of(a, "pre-" + a);
}

@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(Types.STRING(), Types.STRING());
}
}

ScalarFunction func = new MyMapFunction();
tableEnv.registerFunction("func", func);
tableEnv.createTemporarySystemFunction("func", MyMapFunction.class);

Table table = input
.map(call("func", $("c")).as("a", "b"));
Expand Down Expand Up @@ -2252,6 +2245,7 @@ table = input.map(pandas_func).alias('a', 'b')
使用表函数执行 `flatMap` 操作。

```java
@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW<s1 STRING, i INT>"))
public class MyFlatMapFunction extends TableFunction<Row> {

public void eval(String str) {
Expand All @@ -2262,15 +2256,9 @@ public class MyFlatMapFunction extends TableFunction<Row> {
}
}
}

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING(), Types.INT());
}
}

TableFunction func = new MyFlatMapFunction();
tableEnv.registerFunction("func", func);
tableEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class);

Table table = input
.flatMap(call("func", $("c")).as("a", "b"));
Expand Down Expand Up @@ -2364,13 +2352,21 @@ public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
}

@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.INT, Types.INT);
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.typedArguments(DataTypes.INT())
.accumulatorTypeStrategy(
TypeStrategies.explicit(
DataTypes.STRUCTURED(
MyMinMaxAcc.class,
DataTypes.FIELD("min", DataTypes.INT()),
DataTypes.FIELD("max", DataTypes.INT()))))
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT()))
.build();
}
}

AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class);
Table table = input
.groupBy($("key"))
.aggregate(call("myAggFunc", $("a")).as("x", "y"))
Expand Down Expand Up @@ -2406,9 +2402,17 @@ class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(Types.INT, Types.INT)
}
override def getTypeInference(typeFactory: DataTypeFactory): TypeInference =
TypeInference.newBuilder
.typedArguments(DataTypes.INT)
.accumulatorTypeStrategy(
TypeStrategies.explicit(
DataTypes.STRUCTURED(
classOf[MyMinMaxAcc],
DataTypes.FIELD("min", DataTypes.INT),
DataTypes.FIELD("max", DataTypes.INT))))
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT))
.build
}

val myAggFunc = new MyMinMax
Expand Down Expand Up @@ -2491,8 +2495,7 @@ t.aggregate(pandas_udaf.alias("a", "b")) \
{{< tabs "group-window-agg" >}}
{{< tab "Java" >}}
```java
AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class);

Table table = input
.window(Tumble.over(lit(5).minutes())
Expand Down Expand Up @@ -2596,7 +2599,7 @@ public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2A
}
}

tEnv.registerFunction("top2", new Top2());
tEnv.createTemporarySystemFunction("top2", Top2.class);
Table orders = tableEnv.from("Orders");
Table result = orders
.groupBy($("key"))
Expand Down
63 changes: 33 additions & 30 deletions docs/content/docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ User-defined aggregation function can also be used with `DISTINCT` modifiers. To
Table orders = tEnv.from("Orders");

// Use distinct aggregation for user-defined aggregate functions
tEnv.registerFunction("myUdagg", new MyUdagg());
tEnv.createTemporarySystemFunction("myUdagg", MyUdagg.class);
orders.groupBy("users")
.select(
$("users"),
Expand Down Expand Up @@ -1025,8 +1025,7 @@ A row of the left (outer) table is dropped, if its table function call returns a
{{< tab "Java" >}}
```java
// register User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);

// join
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -1074,8 +1073,7 @@ Currently, the predicate of a table function left outer join can only be empty o
{{< tab "Java" >}}
```java
// register User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);

// join
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -1127,7 +1125,7 @@ Table ratesHistory = tableEnv.from("RatesHistory");
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
"r_proctime",
"r_currency");
tableEnv.registerFunction("rates", rates);
tableEnv.createTemporarySystemFunction("rates", rates);

// join with "Orders" based on the time attribute and key
Table orders = tableEnv.from("Orders");
Expand Down Expand Up @@ -2174,19 +2172,14 @@ The row-based operations generate outputs with multiple columns.
Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

```java
@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW<s1 STRING, s2 STRING>"))
public class MyMapFunction extends ScalarFunction {
public Row eval(String a) {
return Row.of(a, "pre-" + a);
}

@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(Types.STRING, Types.STRING);
}
}

ScalarFunction func = new MyMapFunction();
tableEnv.registerFunction("func", func);
tableEnv.createTemporarySystemFunction("func", MyMapFunction.class);

Table table = input
.map(call("func", $("c"))).as("a", "b");
Expand Down Expand Up @@ -2251,6 +2244,7 @@ table = input.map(pandas_func).alias('a', 'b')
Performs a `flatMap` operation with a table function.

```java
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public class MyFlatMapFunction extends TableFunction<Row> {

public void eval(String str) {
Expand All @@ -2261,15 +2255,9 @@ public class MyFlatMapFunction extends TableFunction<Row> {
}
}
}

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}
}

TableFunction func = new MyFlatMapFunction();
tableEnv.registerFunction("func", func);
tableEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class);

Table table = input
.flatMap(call("func", $("c"))).as("a", "b");
Expand Down Expand Up @@ -2363,13 +2351,21 @@ public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
}

@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.INT, Types.INT);
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.typedArguments(DataTypes.INT())
.accumulatorTypeStrategy(
TypeStrategies.explicit(
DataTypes.STRUCTURED(
MyMinMaxAcc.class,
DataTypes.FIELD("min", DataTypes.INT()),
DataTypes.FIELD("max", DataTypes.INT()))))
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT()))
.build();
}
}

AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class);
Table table = input
.groupBy($("key"))
.aggregate(call("myAggFunc", $("a")).as("x", "y"))
Expand Down Expand Up @@ -2405,9 +2401,17 @@ class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(Types.INT, Types.INT)
}
override def getTypeInference(typeFactory: DataTypeFactory): TypeInference =
TypeInference.newBuilder
.typedArguments(DataTypes.INT)
.accumulatorTypeStrategy(
TypeStrategies.explicit(
DataTypes.STRUCTURED(
classOf[MyMinMaxAcc],
DataTypes.FIELD("min", DataTypes.INT),
DataTypes.FIELD("max", DataTypes.INT))))
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT))
.build
}

val myAggFunc = new MyMinMax
Expand Down Expand Up @@ -2490,8 +2494,7 @@ Groups and aggregates a table on a [group window](#group-window) and possibly on
{{< tabs "group-window-agg" >}}
{{< tab "Java" >}}
```java
AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class);

Table table = input
.window(Tumble.over(lit(5).minutes())
Expand Down Expand Up @@ -2596,7 +2599,7 @@ public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2A
}
}

tEnv.registerFunction("top2", new Top2());
tEnv.createTemporarySystemFunction("top2", Top2.class);
Table orders = tableEnv.from("Orders");
Table result = orders
.groupBy($("key"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public interface AggregatedTable {
* <p>Example:
*
* <pre>{@code
* AggregateFunction aggFunc = new MyAggregateFunction();
* tableEnv.registerFunction("aggFunc", aggFunc);
* tableEnv.createTemporarySystemFunction("aggFunc", MyAggregateFunction.class);
* table.groupBy($("key"))
* .aggregate(call("aggFunc", $("a"), $("b")).as("f0", "f1", "f2"))
* .select($("key"), $("f0"), $("f1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public interface FlatAggregateTable {
* <p>Example:
*
* <pre>{@code
* TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
* tableEnv.registerFunction("tableAggFunc", tableAggFunc);
* tableEnv.createTemporarySystemFunction("tableAggFunc", MyTableAggregateFunction.class);
* tab.groupBy($("key"))
* .flatAggregate(call("tableAggFunc", $("a"), $("b")).as("x", "y", "z"))
* .select($("key"), $("x"), $("y"), $("z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public interface GroupedTable {
* <p>Example:
*
* <pre>{@code
* AggregateFunction aggFunc = new MyAggregateFunction();
* tableEnv.registerFunction("aggFunc", aggFunc);
* tableEnv.createTemporarySystemFunction("aggFunc", MyAggregateFunction.class);
* tab.groupBy($("key"))
* .aggregate(call("aggFunc", $("a"), $("b")).as("f0", "f1", "f2"))
* .select($("key"), $("f0"), $("f1"));
Expand All @@ -76,8 +75,7 @@ public interface GroupedTable {
* <p>Example:
*
* <pre>{@code
* TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
* tableEnv.registerFunction("tableAggFunc", tableAggFunc);
* tableEnv.createTemporarySystemFunction("tableAggFunc", MyTableAggregateFunction.class);
* tab.groupBy($("key"))
* .flatAggregate(call("tableAggFunc", $("a"), $("b")).as("x", "y", "z"))
* .select($("key"), $("x"), $("y"), $("z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
@TestTemplate
def testTableAggregate(): Unit = {
val top3 = new Top3WithMapView
tEnv.registerFunction("top3", top3)
tEnv.createTemporarySystemFunction("top3", top3)
val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
val resultTable = source
.groupBy('a)
Expand Down Expand Up @@ -161,7 +161,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
private def createTableAggregateWithRetract()
: (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
val top3 = new Top3WithRetractInput
tEnv.registerFunction("top3", top3)
tEnv.createTemporarySystemFunction("top3", top3)
val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
val resultTable = source
.groupBy('a)
Expand Down
Loading

0 comments on commit d12d49e

Please sign in to comment.