From 8105e15ae2c7d7a37caa9ef463420b63d6f5ef19 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Tue, 14 Mar 2017 18:08:14 +0800 Subject: [PATCH 1/2] [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase occasionally fails --- .../datastream/DataStreamUserDefinedFunctionITCase.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala index 853c7715d7fda..f308134405095 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala @@ -85,6 +85,8 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB def testUserDefinedTableFunctionWithParameter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + val tableFunc1 = new RichTableFunc1 tEnv.registerFunction("RichTableFunc1", tableFunc1) UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " ")) @@ -107,6 +109,8 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + val tableFunc1 = new RichTableFunc1 val richFunc2 = new RichFunc2 tEnv.registerFunction("RichTableFunc1", tableFunc1) @@ -199,6 +203,8 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB def testTableFunctionWithVariableArguments(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + val varArgsFunc0 = new VarArgsFunc0 tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0) From 2c16f1a4a05f49f292fa03e1d5dfec9709a9f3f8 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Tue, 14 Mar 2017 18:26:22 +0800 Subject: [PATCH 2/2] [FLINK-6040] [table] Modification as Kurt's comments --- .../DataStreamUserDefinedFunctionITCase.scala | 42 +++++-------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala index f308134405095..2e8a065e79c42 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala @@ -27,18 +27,22 @@ import org.apache.flink.table.expressions.utils.{Func13, RichFunc2} import org.apache.flink.table.utils._ import org.apache.flink.types.Row import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import scala.collection.mutable class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { - @Test - def testCrossJoin(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) + + @Before + def clear(): Unit = { StreamITCase.clear + } + @Test + def testCrossJoin(): Unit = { val t = testData(env).toTable(tEnv).as('a, 'b, 'c) val func0 = new TableFunc0 val pojoFunc0 = new PojoTableFunc() @@ -60,10 +64,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testLeftOuterJoin(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) val func0 = new TableFunc0 @@ -83,10 +83,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testUserDefinedTableFunctionWithParameter(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val tableFunc1 = new RichTableFunc1 tEnv.registerFunction("RichTableFunc1", tableFunc1) UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " ")) @@ -107,10 +103,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val tableFunc1 = new RichTableFunc1 val richFunc2 = new RichFunc2 tEnv.registerFunction("RichTableFunc1", tableFunc1) @@ -141,10 +133,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testTableFunctionConstructorWithParams(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) val config = Map("key1" -> "value1", "key2" -> "value2") val func30 = new TableFunc3(null) @@ -176,10 +164,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testScalarFunctionConstructorWithParams(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) val func0 = new Func13("default") val func1 = new Func13("Sunny") @@ -201,15 +185,11 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB @Test def testTableFunctionWithVariableArguments(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tableEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - val varArgsFunc0 = new VarArgsFunc0 - tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0) + tEnv.registerFunction("VarArgsFunc0", varArgsFunc0) val result = testData(env) - .toTable(tableEnv, 'a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c) .select('c) .join(varArgsFunc0("1", "2", 'c))