From 9b40f85e9ebbe2524b440d6abc9722380467266c Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Sep 2016 17:05:27 +0800 Subject: [PATCH 1/2] [FLINK-4686] [table] Add possibility to get column names --- .../org/apache/flink/api/table/Schema.scala | 89 +++++++++++++++++++ .../apache/flink/api/table/exceptions.scala | 5 ++ .../org/apache/flink/api/table/table.scala | 11 +++ .../flink/api/java/batch/SchemaTest.java | 84 +++++++++++++++++ 4 files changed, 189 insertions(+) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala create mode 100644 flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala new file mode 100644 index 0000000000000..7f35bc5b8da44 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.common.typeinfo.TypeInformation + +/** + * A Schema represents a Table's structure + */ +class Schema( + val columnNames: Array[String], + val columnTypes: Array[TypeInformation[_]]) { + + if (columnNames.length != columnTypes.length) { + throw new TableException( + "Number of column indexes and column names must be equal.") + } + + // check uniqueness of field names + if (columnNames.toSet.size != columnTypes.length) { + throw new TableException( + "Table column names must be unique.") + } + + val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap + + /** + * Returns all type informations as an array + */ + def getTypes: Array[TypeInformation[_]] = columnTypes + + /** + * Returns the specified type information for the given column index + * + * @param columnIndex the index of the field + */ + def getType(columnIndex: Int): TypeInformation[_] = columnTypes(columnIndex) + + /** + * Returns the specified type information for the given column name + * + * @param columnName the name of the field + */ + def getType(columnName: String): TypeInformation[_] = { + if (columnNameToIndex.contains(columnName)) { + columnTypes(columnNameToIndex(columnName)) + } else { + throw FieldNotFoundException(s"Table doesn't have the column : $columnName") + } + } + + /** + * Returns all column names as an array. + */ + def getColumnNames: Array[String] = columnNames + + /** + * Return the specified column name for the given column index. + * + * @param columnIndex the index of the field + */ + def getColumnName(columnIndex: Int): String = columnNames(columnIndex) + + + override def toString = { + val builder = new StringBuilder + builder.append("root\n") + columnNames.zip(columnTypes).foreach{ case (name, typeInfo) => + builder.append(s" |-- $name: $typeInfo\n") + } + builder.toString() + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala index 773e25607990b..f8d0cdb8eaeff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala @@ -60,3 +60,8 @@ object ValidationException { * Exception for unwanted method calling on unresolved expression. */ case class UnresolvedException(msg: String) extends RuntimeException(msg) + +/** + * Exception for the table doesn't have a field of a specified name. + */ +case class FieldNotFoundException(msg: String) extends RuntimeException(msg) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 9d96780d12c20..450e5b3067e53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -65,6 +65,17 @@ class Table( def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) + /** + * Returns the schema of this Table + */ + def schema: Schema = new Schema(logicalPlan.output.map(_.name).toArray, + logicalPlan.output.map(_.resultType).toArray) + + /** + * Print the schema to the console in a nice tree format + */ + def printSchema(): Unit = print(schema.toString) + /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions * can contain complex expressions and aggregations. diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java new file mode 100644 index 0000000000000..f71cd29dab171 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.batch; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.FieldNotFoundException; +import org.apache.flink.api.table.Schema; +import org.apache.flink.api.table.TableException; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SchemaTest { + + @Test + public void testSchema() { + String[] fieldNames = new String[]{"a", "b"}; + TypeInformation[] typeInfos = new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + Schema s = new Schema(fieldNames, typeInfos); + + assertArrayEquals(fieldNames, s.getColumnNames()); + assertArrayEquals(typeInfos, s.getTypes()); + + assertEquals("b", s.getColumnName(1)); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, s.getType(1)); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, s.getType("a")); + + String expectedSchemaString = "root\n" + + " |-- a: Integer\n" + + " |-- b: String\n"; + assertEquals(expectedSchemaString, s.toString()); + + try { + s.getColumnName(3); + fail("out of index, should throw exception"); + } catch (Exception e) { + assertTrue(e instanceof IndexOutOfBoundsException); + } + + try { + s.getType(-1); + fail("out of index, should throw exception"); + } catch (Exception e) { + assertTrue(e instanceof IndexOutOfBoundsException); + } + + try { + s.getType("c"); + fail("non-exist field name, should throw exception"); + } catch (Exception e) { + assertTrue(e instanceof FieldNotFoundException); + } + } + + @Test(expected = TableException.class) + public void testInvalidSchema() { + String[] fieldNames = new String[]{"a", "b", "c"}; + TypeInformation[] typeInfos = new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + Schema s = new Schema(fieldNames, typeInfos); + } + + +} From 68823a74ad3c53d353f6c8d993967aab3b03e328 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Thu, 29 Sep 2016 12:16:17 +0800 Subject: [PATCH 2/2] review comments addressed --- .../org/apache/flink/api/table/Schema.scala | 29 +++++++++++------- .../apache/flink/api/table/exceptions.scala | 5 ---- .../org/apache/flink/api/table/table.scala | 8 +++-- .../flink/api/java/batch/SchemaTest.java | 30 ++++--------------- .../api/scala/batch/sql/JoinITCase.scala | 4 +++ .../api/scala/batch/sql/SelectITCase.scala | 4 +++ 6 files changed, 38 insertions(+), 42 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala index 7f35bc5b8da44..25a011e86401e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala @@ -23,8 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation * A Schema represents a Table's structure */ class Schema( - val columnNames: Array[String], - val columnTypes: Array[TypeInformation[_]]) { + private val columnNames: Array[String], + private val columnTypes: Array[TypeInformation[_]]) { if (columnNames.length != columnTypes.length) { throw new TableException( @@ -46,21 +46,25 @@ class Schema( /** * Returns the specified type information for the given column index - * * @param columnIndex the index of the field */ - def getType(columnIndex: Int): TypeInformation[_] = columnTypes(columnIndex) + def getType(columnIndex: Int): Option[TypeInformation[_]] = { + if (columnIndex < 0 || columnIndex >= columnNames.length) { + None + } else { + Some(columnTypes(columnIndex)) + } + } /** * Returns the specified type information for the given column name - * * @param columnName the name of the field */ - def getType(columnName: String): TypeInformation[_] = { + def getType(columnName: String): Option[TypeInformation[_]] = { if (columnNameToIndex.contains(columnName)) { - columnTypes(columnNameToIndex(columnName)) + Some(columnTypes(columnNameToIndex(columnName))) } else { - throw FieldNotFoundException(s"Table doesn't have the column : $columnName") + None } } @@ -71,10 +75,15 @@ class Schema( /** * Return the specified column name for the given column index. - * * @param columnIndex the index of the field */ - def getColumnName(columnIndex: Int): String = columnNames(columnIndex) + def getColumnName(columnIndex: Int): Option[String] = { + if (columnIndex < 0 || columnIndex >= columnNames.length) { + None + } else { + Some(columnNames(columnIndex)) + } + } override def toString = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala index f8d0cdb8eaeff..773e25607990b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala @@ -60,8 +60,3 @@ object ValidationException { * Exception for unwanted method calling on unresolved expression. */ case class UnresolvedException(msg: String) extends RuntimeException(msg) - -/** - * Exception for the table doesn't have a field of a specified name. - */ -case class FieldNotFoundException(msg: String) extends RuntimeException(msg) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 450e5b3067e53..3ef7fc466c0ae 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -65,16 +65,18 @@ class Table( def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) + val tableSchema: Schema = new Schema(logicalPlan.output.map(_.name).toArray, + logicalPlan.output.map(_.resultType).toArray) + /** * Returns the schema of this Table */ - def schema: Schema = new Schema(logicalPlan.output.map(_.name).toArray, - logicalPlan.output.map(_.resultType).toArray) + def schema: Schema = tableSchema /** * Print the schema to the console in a nice tree format */ - def printSchema(): Unit = print(schema.toString) + def printSchema(): Unit = print(tableSchema.toString) /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java index f71cd29dab171..d86cf0da8d6b3 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/SchemaTest.java @@ -19,7 +19,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.FieldNotFoundException; import org.apache.flink.api.table.Schema; import org.apache.flink.api.table.TableException; import org.junit.Test; @@ -40,35 +39,18 @@ public void testSchema() { assertArrayEquals(fieldNames, s.getColumnNames()); assertArrayEquals(typeInfos, s.getTypes()); - assertEquals("b", s.getColumnName(1)); - assertEquals(BasicTypeInfo.STRING_TYPE_INFO, s.getType(1)); - assertEquals(BasicTypeInfo.INT_TYPE_INFO, s.getType("a")); + assertEquals("b", s.getColumnName(1).get()); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, s.getType(1).get()); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, s.getType("a").get()); String expectedSchemaString = "root\n" + " |-- a: Integer\n" + " |-- b: String\n"; assertEquals(expectedSchemaString, s.toString()); - try { - s.getColumnName(3); - fail("out of index, should throw exception"); - } catch (Exception e) { - assertTrue(e instanceof IndexOutOfBoundsException); - } - - try { - s.getType(-1); - fail("out of index, should throw exception"); - } catch (Exception e) { - assertTrue(e instanceof IndexOutOfBoundsException); - } - - try { - s.getType("c"); - fail("non-exist field name, should throw exception"); - } catch (Exception e) { - assertTrue(e instanceof FieldNotFoundException); - } + assertTrue(s.getColumnName(3).isEmpty()); + assertTrue(s.getType(-1).isEmpty()); + assertTrue(s.getType("c").isEmpty()); } @Test(expected = TableException.class) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala index 6a02fd47cc562..d657efa55cedb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment, TableException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -53,6 +54,9 @@ class JoinITCase( val result = tEnv.sql(sqlQuery) + // test for schema + assertEquals("root\n |-- c: String\n |-- g: String\n", result.schema.toString) + val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala index 07b802d9945c3..7053cfc85f5bd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala @@ -29,6 +29,7 @@ import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized +import org.junit.Assert.assertEquals import scala.collection.JavaConverters._ @@ -49,6 +50,9 @@ class SelectITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) + // test for schema + assertEquals("root\n |-- a: Integer\n |-- b: Long\n |-- c: String\n", ds.schema.toString) + val result = tEnv.sql(sqlQuery) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +