From 555828aa6a257921f52fb87b8ab5f2afff78eee0 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Tue, 5 Sep 2017 08:55:03 +0800 Subject: [PATCH] [FLINK-7572][table]Improve TableSchema and FlinkTable validation exception message --- .../apache/flink/table/api/TableSchema.scala | 15 ++++++-- .../flink/table/plan/schema/FlinkTable.scala | 12 +++++-- .../validation/FlinkTableValidationTest.scala | 36 +++++++++++++++++++ .../TableSchemaValidationTest.scala | 28 +++++++++++++-- 4 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala index a67a07af106f0..e183d34b512e0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala @@ -28,17 +28,28 @@ class TableSchema( if (columnNames.length != columnTypes.length) { throw new TableException( - "Number of column indexes and column names must be equal.") + s"Number of column indexes and column names must be equal." + + s"\nColumn names count is [${columnNames.length}]" + + s"\nColumn types count is [${columnTypes.length}]" + + s"\nColumn names:${columnNames.mkString("[ ", ", ", " ]")}" + + s"\nColumn types:${columnTypes.mkString("[ ", ", ", " ]")}") } // check uniqueness of field names if (columnNames.toSet.size != columnTypes.length) { + val columnNameBuffer = columnNames.toBuffer + val duplicate = columnNames.filter( + name => columnNameBuffer.-=(name).contains(name)) + throw new TableException( - "Table column names must be unique.") + s"Table column names must be unique." + + s"\nThe duplicate columns are: ${duplicate.mkString("[ ", ", ", " ]")}" + + s"\nAll column names: ${columnNames.mkString("[ ", ", ", " ]")}") } val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap + val columnNameToString:Seq[String] = columnNames.toSeq /** * Returns all type information as an array. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index df56ae6ec2405..430fad9ec9cbb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -37,13 +37,21 @@ abstract class FlinkTable[T]( if (fieldIndexes.length != fieldNames.length) { throw new TableException( - "Number of field indexes and field names must be equal.") + s"Number of field indexes and field names must be equal. " + + s"\nField names count is [${fieldNames.length}]" + + s"\nField indexs count is [${fieldIndexes.length}]" + + s"\nField names: ${fieldNames.mkString("[ ", ", ", " ]")}" + + s"\nField indexs: ${fieldIndexes.mkString("[ ", ", ", " ]")}") } // check uniqueness of field names if (fieldNames.length != fieldNames.toSet.size) { + val fieldNameBuffer = fieldNames.toBuffer + val duplicate = fieldNames.filter(name => fieldNameBuffer.-=(name).contains(name)) throw new TableException( - "Table field names must be unique.") + s"Table field names must be unique." + + s"\nThe duplicate fields are: ${duplicate.mkString("[ ", ", ", " ]")}" + + s"\nAll field names: ${fieldNames.mkString("[ ", ", ", " ]")}") } val fieldTypes: Array[TypeInformation[_]] = diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala new file mode 100644 index 0000000000000..6345976628a68 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class FlinkTableValidationTest extends TableTestBase { + @Test + def testFieldNamesDuplicate() { + thrown.expect(classOf[TableException]) + thrown.expectMessage("Table field names must be unique." + + "\nThe duplicate fields are: [ a ]" + + "\nAll field names: [ a, a, b ]") + val util = batchTestUtil() + util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b) + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala index 1a7815aa1b257..ab0f9a038380e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala @@ -24,12 +24,36 @@ import org.junit.Test class TableSchemaValidationTest extends TableTestBase { - @Test(expected = classOf[TableException]) - def testInvalidSchema() { + @Test + def testColumnNameAndColumnTypeNotEqual() { + thrown.expect(classOf[TableException]) + thrown.expectMessage( + "Number of column indexes and column names must be equal." + + "\nColumn names count is [3]" + + "\nColumn types count is [2]" + + "\nColumn names:[ a, b, c ]" + + "\nColumn types:[ Integer, String ]") + val fieldNames = Array("a", "b", "c") val typeInfos: Array[TypeInformation[_]] = Array( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) new TableSchema(fieldNames, typeInfos) } + + @Test + def testColumnNamesDuplicate() { + thrown.expect(classOf[TableException]) + thrown.expectMessage( + "Table column names must be unique." + + "\nThe duplicate columns are: [ a ]" + + "\nAll column names: [ a, a, c ]") + + val fieldNames = Array("a", "a", "c") + val typeInfos: Array[TypeInformation[_]] = Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO) + new TableSchema(fieldNames, typeInfos) + } }