Skip to content

Commit

Permalink
[fixup] Now json plan tests are checking that we correctly set transf…
Browse files Browse the repository at this point in the history
…ormation uids

Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Feb 10, 2022
1 parent 1349ef7 commit f8e3980
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc2;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.UdfWithOpen;
Expand All @@ -43,15 +42,14 @@ public void testSimpleCalc() throws Exception {
File sinkPath =
createTestCsvSinkTable("MySink", "a bigint", "a1 varchar", "b int", "c1 varchar");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select "
+ "a, "
+ "cast(a as varchar) as a1, "
+ "b, "
+ "substring(c, 1, 8) as c1 "
+ "from MyTable where b > 1");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable where b > 1")
.await();

assertResult(Collections.singletonList("3,3,2,hello wo"), sinkPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void testDeduplication() throws Exception {
tableEnv.getConfig()
.getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
checkTransformationUids(compiledPlan);
tableEnv.executePlan(compiledPlan).await();

assertResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
Expand Down Expand Up @@ -57,13 +56,12 @@ public void testExpand() throws Exception {
"c varchar");
createTestNonInsertOnlyValuesSinkTable(
"MySink", "b bigint", "a bigint", "c varchar", "primary key (b) not enforced");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select b, "
+ "count(distinct a) as a, "
+ "max(c) as c "
+ "from MyTable group by b");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(Arrays.asList("+I[1, 1, Hi]", "+I[2, 2, Hello world]"), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
Expand Down Expand Up @@ -86,14 +85,13 @@ public void testSimpleAggCallsWithGroupBy() throws Exception {
"avg_a double",
"min_c varchar",
"primary key (b) not enforced");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select b, "
+ "count(*) as cnt, "
+ "avg(a) filter (where a > 1) as avg_a, "
+ "min(c) as min_c "
+ "from MyTable group by b");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result);
Expand All @@ -119,17 +117,16 @@ public void testDistinctAggCalls() throws Exception {
"avg_b double",
"cnt_d bigint",
"primary key (e) not enforced");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select e, "
+ "count(distinct a) filter (where b > 10) as cnt_a1, "
+ "count(distinct a) as cnt_a2, "
+ "sum(distinct a) as sum_a, "
+ "sum(distinct b) as sum_b, "
+ "avg(b) as avg_b, "
+ "count(distinct d) as concat_d "
+ "from MyTable group by e");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand Down Expand Up @@ -162,15 +159,14 @@ public void testUserDefinedAggCallsWithoutMerge() throws Exception {
"s3 bigint",
"primary key (d) not enforced");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select "
+ "e, "
+ "my_sum1(c, 10) as s1, "
+ "my_sum2(5, c) as s2, "
+ "my_avg(e, a) as s3 "
+ "from MyTable group by e");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand All @@ -194,14 +190,13 @@ public void testUserDefinedAggCallsWithMerge() throws Exception {
createTestNonInsertOnlyValuesSinkTable(
"MySink", "d bigint", "s1 bigint", "c1 varchar", "primary key (d) not enforced");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select "
+ "e, "
+ "my_avg(e, a) as s1, "
+ "my_concat_agg(d) as c1 "
+ "from MyTable group by e");
tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
Expand Down Expand Up @@ -69,8 +68,7 @@ public void testEventTimeTumbleWindow() throws Exception {
"cnt BIGINT",
"sum_int INT",
"distinct_cnt BIGINT");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select\n"
+ " name,\n"
+ " TUMBLE_START(rowtime, INTERVAL '5' SECOND) as window_start,\n"
Expand All @@ -79,8 +77,8 @@ public void testEventTimeTumbleWindow() throws Exception {
+ " SUM(`int`),\n"
+ " COUNT(DISTINCT `string`)\n"
+ "FROM MyTable\n"
+ "GROUP BY name, TUMBLE(rowtime, INTERVAL '5' SECOND)");
tableEnv.executePlan(compiledPlan).await();
+ "GROUP BY name, TUMBLE(rowtime, INTERVAL '5' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand All @@ -97,14 +95,13 @@ public void testEventTimeTumbleWindow() throws Exception {
@Test
public void testEventTimeHopWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select\n"
+ " name,\n"
+ " COUNT(*)\n"
+ "FROM MyTable\n"
+ "GROUP BY name, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");
tableEnv.executePlan(compiledPlan).await();
+ "GROUP BY name, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand All @@ -126,14 +123,13 @@ public void testEventTimeHopWindow() throws Exception {
@Test
public void testEventTimeSessionWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select\n"
+ " name,\n"
+ " COUNT(*)\n"
+ "FROM MyTable\n"
+ "GROUP BY name, Session(rowtime, INTERVAL '3' SECOND)");
tableEnv.executePlan(compiledPlan).await();
+ "GROUP BY name, Session(rowtime, INTERVAL '3' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
Expand Down Expand Up @@ -78,13 +77,11 @@ public void testIncrementalAggregate()
"c varchar");
createTestNonInsertOnlyValuesSinkTable(
"MySink", "b bigint", "a bigint", "primary key (b) not enforced");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink select b, "
+ "count(distinct a) as a "
+ "from MyTable group by b");

tableEnv.executePlan(compiledPlan).await();
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -50,15 +49,14 @@ public void testProcessTimeInnerJoin() throws Exception {
"T2", rowT2, "a int", "b bigint", "c varchar", "proctime as PROCTIME()");
createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink "
+ "SELECT t2.a, t2.c, t1.c\n"
+ "FROM T1 as t1 join T2 as t2 ON\n"
+ " t1.a = t2.a AND\n"
+ " t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n"
+ " t2.proctime + INTERVAL '5' SECOND");
tableEnv.executePlan(compiledPlan).await();
+ " t2.proctime + INTERVAL '5' SECOND")
.await();
List<String> expected =
Arrays.asList(
"+I[1, HiHi, Hi1]",
Expand Down Expand Up @@ -100,15 +98,14 @@ public void testRowTimeInnerJoin() throws Exception {
"watermark for rowtime as rowtime - INTERVAL '5' second");
createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink \n"
+ "SELECT t2.a, t2.c, t1.c\n"
+ "FROM T1 as t1 join T2 as t2 ON\n"
+ " t1.a = t2.a AND\n"
+ " t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n"
+ " t2.rowtime + INTERVAL '6' SECOND");
tableEnv.executePlan(compiledPlan).await();
+ " t2.rowtime + INTERVAL '6' SECOND")
.await();
List<String> expected =
Arrays.asList(
"+I[1, HiHi, Hi1]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.runtime.stream.jsonplan;

import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
Expand Down Expand Up @@ -66,8 +65,7 @@ public void testNonWindowInnerJoin() throws Exception {
createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c varchar");
File sinkPath = createTestCsvSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink "
+ "SELECT t2.a, t2.c, t1.c\n"
+ "FROM (\n"
Expand All @@ -76,8 +74,8 @@ public void testNonWindowInnerJoin() throws Exception {
+ "JOIN (\n"
+ " SELECT if(a = 3, cast(null as int), a) as a, b, c FROM T2\n"
+ ") as t2\n"
+ "ON t1.a = t2.a AND t1.b > t2.b");
tableEnv.executePlan(compiledPlan).await();
+ "ON t1.a = t2.a AND t1.b > t2.b")
.await();
List<String> expected =
Arrays.asList(
"1,HiHi,Hi2",
Expand All @@ -100,8 +98,7 @@ public void testIsNullInnerJoinWithNullCond() throws Exception {
createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c varchar");
createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");

CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink "
+ "SELECT t2.a, t2.c, t1.c\n"
+ "FROM (\n"
Expand All @@ -113,8 +110,8 @@ public void testIsNullInnerJoinWithNullCond() throws Exception {
+ "ON \n"
+ " ((t1.a is null AND t2.a is null) OR\n"
+ " (t1.a = t2.a))\n"
+ " AND t1.b > t2.b");
tableEnv.executePlan(compiledPlan).await();
+ " AND t1.b > t2.b")
.await();
List<String> expected =
Arrays.asList(
"+I[1, HiHi, Hi2]",
Expand All @@ -130,10 +127,8 @@ public void testIsNullInnerJoinWithNullCond() throws Exception {
@Test
public void testJoin() throws Exception {
createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
"insert into MySink \n" + "SELECT a3, b4 FROM A, B WHERE a2 = b2");
tableEnv.executePlan(compiledPlan).await();
compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM A, B WHERE a2 = b2")
.await();
List<String> expected =
Arrays.asList(
"+I[Hello world, Hallo Welt]", "+I[Hello, Hallo Welt]", "+I[Hi, Hallo]");
Expand All @@ -143,34 +138,30 @@ public void testJoin() throws Exception {
@Test
public void testInnerJoin() throws Exception {
createTestValuesSinkTable("MySink", "a1 int", "b1 int");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
"insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1");
tableEnv.executePlan(compiledPlan).await();
compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1")
.await();
List<String> expected = Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[2, 2]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
}

@Test
public void testJoinWithFilter() throws Exception {
createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink \n"
+ "SELECT a3, b4 FROM A, B where a2 = b2 and a2 < 2");
tableEnv.executePlan(compiledPlan).await();
+ "SELECT a3, b4 FROM A, B where a2 = b2 and a2 < 2")
.await();
List<String> expected = Arrays.asList("+I[Hi, Hallo]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
}

@Test
public void testInnerJoinWithDuplicateKey() throws Exception {
createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int");
CompiledPlan compiledPlan =
tableEnv.compilePlanSql(
compileSqlAndExecutePlan(
"insert into MySink \n"
+ "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3");
tableEnv.executePlan(compiledPlan).await();
+ "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3")
.await();
List<String> expected = Arrays.asList("+I[2, 2, 2]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
}
Expand Down
Loading

0 comments on commit f8e3980

Please sign in to comment.