From 0694fde595289b4ae145544fb39598fee63e8bf5 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 13 Jul 2016 13:51:05 +0200 Subject: [PATCH 1/3] [FLINK-3729][table] Fix ExplainTest on Windows OS --- .../api/table/BatchTableEnvironment.scala | 9 ++++--- .../flink/api/java/batch/ExplainTest.java | 24 +++++++++---------- .../flink/api/scala/batch/ExplainTest.scala | 24 +++++++++---------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 1ba13be15cc8f..eb4c8191ea173 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -174,9 +174,12 @@ abstract class BatchTableEnvironment( val jasonSqlPlan = env.getExecutionPlan val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) - s"== Abstract Syntax Tree ==\n" + - s"$ast\n" + - s"== Physical Execution Plan ==\n" + + s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + s"$sqlPlan" } diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/ExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/ExplainTest.java index 29cc0d9a0c027..747cd921a5321 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/ExplainTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/ExplainTest.java @@ -50,10 +50,10 @@ public void testFilterWithoutExtended() throws Exception { .fromDataSet(input, "a, b") .filter("a % 2 = 0"); - String result = tableEnv.explain(table); + String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testFilter0.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } @@ -68,10 +68,10 @@ public void testFilterWithExtended() throws Exception { .fromDataSet(input, "a, b") .filter("a % 2 = 0"); - String result = tableEnv.explain(table, true); + String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testFilter1.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } @@ -90,10 +90,10 @@ public void testJoinWithoutExtended() throws Exception { .where("b = d") .select("a, c"); - String result = tableEnv.explain(table); + String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testJoin0.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } @@ -112,10 +112,10 @@ public void testJoinWithExtended() throws Exception { .where("b = d") .select("a, c"); - String result = tableEnv.explain(table, true); + String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testJoin1.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } @@ -131,10 +131,10 @@ public void testUnionWithoutExtended() throws Exception { Table table2 = tableEnv.fromDataSet(input2, "count, word"); Table table = table1.unionAll(table2); - String result = tableEnv.explain(table); + String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testUnion0.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } @@ -150,10 +150,10 @@ public void testUnionWithExtended() throws Exception { Table table2 = tableEnv.fromDataSet(input2, "count, word"); Table table = table1.unionAll(table2); - String result = tableEnv.explain(table, true); + String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); try (Scanner scanner = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testUnion1.out"))){ - String source = scanner.useDelimiter("\\A").next(); + String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); assertEquals(source, result); } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala index 77bb4719e5e4c..ab70ec53190d2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala @@ -39,9 +39,9 @@ class ExplainTest .toTable(tEnv, 'a, 'b) .filter("a % 2 = 0") - val result = tEnv.explain(table) + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter0.out").mkString + "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -54,9 +54,9 @@ class ExplainTest .toTable(tEnv, 'a, 'b) .filter("a % 2 = 0") - val result = tEnv.explain(table, true) + val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter1.out").mkString + "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -69,9 +69,9 @@ class ExplainTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd) val table = table1.join(table2).where("b = d").select("a, c") - val result = tEnv.explain(table) + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin0.out").mkString + "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -84,9 +84,9 @@ class ExplainTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd) val table = table1.join(table2).where("b = d").select("a, c") - val result = tEnv.explain(table, true) + val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin1.out").mkString + "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -99,9 +99,9 @@ class ExplainTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val result = tEnv.explain(table) + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion0.out").mkString + "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -114,9 +114,9 @@ class ExplainTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val result = tEnv.explain(table, true) + val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion1.out").mkString + "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } } From d79866a78b30d6794c6c96431ac53d6a5119890d Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 13 Jul 2016 13:51:34 +0200 Subject: [PATCH 2/3] [FLINK-3729][table] Fix TableSinkITCase on Windows OS --- .../org/apache/flink/api/scala/batch/TableSinkITCase.scala | 2 +- .../org/apache/flink/api/scala/stream/TableSinkITCase.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala index dd0668c5108ed..f89b90d9edfda 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -47,7 +47,7 @@ class TableSinkITCase( val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") tmpFile.deleteOnExit() - val path = "file:///" + tmpFile.getAbsolutePath + val path = tmpFile.toURI.toString val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala index 9dfee11b18b54..f3eb87cb251f2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala @@ -38,7 +38,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") tmpFile.deleteOnExit() - val path = "file:///" + tmpFile.getAbsolutePath + val path = tmpFile.toURI.toString val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) From 31c121c25cae70262c0187e43b41a144a326ab33 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 13 Jul 2016 13:51:53 +0200 Subject: [PATCH 3/3] [hotfix][table] Remove unused imports in TableSinkITCase --- .../apache/flink/api/scala/batch/TableSinkITCase.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala index f89b90d9edfda..407fa4cefb9ae 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -20,17 +20,13 @@ package org.apache.flink.api.scala.batch import java.io.File -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, TableEnvironment} -import org.apache.flink.api.table.sinks.{CsvTableSink, TableSink, BatchTableSink} -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.sinks.CsvTableSink +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.junit.Test import org.junit.runner.RunWith