From e84bff0b3c633d0fc8c77f4678a3266c0dad194c Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Thu, 9 Nov 2017 00:05:20 -0800 Subject: [PATCH] use PEEK_FIELDS_NO_EXPAND for CompositeRelDataType --- .../plan/schema/CompositeRelDataType.scala | 4 +- .../table/runtime/batch/sql/CalcITCase.scala | 38 +++++++++++++++++++ .../runtime/batch/table/CalcITCase.scala | 13 +++++++ .../table/runtime/stream/sql/SqlITCase.scala | 21 ++++++++++ .../runtime/stream/table/CalcITCase.scala | 16 ++++++++ .../table/runtime/utils/StreamTestData.scala | 9 +++++ 6 files changed, 100 insertions(+), 1 deletion(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala index e0c6b6ffcc460..f8c61fbfc88ab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala @@ -38,7 +38,9 @@ class CompositeRelDataType( val compositeType: CompositeType[_], val nullable: Boolean, typeFactory: FlinkTypeFactory) - extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) { + extends RelRecordType( + StructKind.PEEK_FIELDS_NO_EXPAND, + createFieldList(compositeType, typeFactory)) { override def toString = s"COMPOSITE($compositeType)" diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala index b891a7d3cf1b4..db3bed8926ad9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala @@ -66,6 +66,25 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testSelectStarFromNestedTable(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test def testSelectStarFromDataSet(): Unit = { @@ -90,6 +109,25 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testSelectStarFromNestedDataSet(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.getSmallNestedTupleDataSet(env) + tEnv.registerDataSet("MyTable", ds, 'a, 'b) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test def testSimpleSelectAll(): Unit = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index e947c3f3bed55..246a7d0eacc3c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -130,6 +130,19 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testSelectStarFromNestedTable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b).select('*) + + val expected = + "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test def testAllRejectingFilter(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index c49af5cc626e3..513df846e3d63 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -151,6 +151,27 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } + /** test select star **/ + @Test + def testSelectStarFromNestedTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val sqlQuery = "SELECT * FROM MyTable" + + val t = StreamTestData.getSmallNestedTupleDataStream(env).toTable(tEnv).as('a, 'b) + tEnv.registerTable("MyTable", t) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List("(1,1),one", "(2,2),two", "(3,3),three") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + /** test selection **/ @Test def testSelectExpressionFromTable(): Unit = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala index 480d817d4a3cf..7a4b2fa9bf5ff 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala @@ -54,6 +54,22 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testSelectStarFromNestedTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmallNestedTupleDataStream(env).toTable(tEnv).select('*) + + val results = ds.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList("(1,1),one", "(2,2),two", "(3,3),three") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + @Test def testSelectFirst(): Unit = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala index 94ced19385aa3..58d3c635a66bd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala @@ -80,4 +80,13 @@ object StreamTestData { data.+=((5, 15L, 14, "KLM", 2L)) env.fromCollection(data) } + + def getSmallNestedTupleDataStream(env: StreamExecutionEnvironment): + DataStream[((Int, Int), String)] = { + val data = new mutable.MutableList[((Int, Int), String)] + data.+=(((1, 1), "one")) + data.+=(((2, 2), "two")) + data.+=(((3, 3), "three")) + env.fromCollection(data) + } }