From 0064a9ed1270d1e139ff9a6b22df5cc95450ce59 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Wed, 15 Mar 2017 13:24:42 +0100 Subject: [PATCH] [FLINK-6059] [table] Reject GenericType when converting DataSet or DataStream to Table. --- .../flink/table/api/TableEnvironment.scala | 13 +++++++- .../java/batch/TableEnvironmentITCase.java | 31 +++++++++++++++++++ .../flink/table/TableEnvironmentTest.scala | 17 ++++++++-- .../scala/batch/TableEnvironmentITCase.scala | 30 ++++++++++++++++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 1dda3a89c1aec7..a23db1863650d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -479,7 +479,14 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { - (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + + if (inputType.isInstanceOf[GenericTypeInfo[Row]]) { + throw new TableException( + "Row with GenericTypeInfo cannot be converted to Table. " + + "Please specify RowTypeInfo explicitly.") + } else { + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + } } /** @@ -498,6 +505,10 @@ abstract class TableEnvironment(val config: TableConfig) { TableEnvironment.validateType(inputType) val indexedNames: Array[(Int, String)] = inputType match { + case g: GenericTypeInfo[Row] => + throw new TableException( + "Row with GenericTypeInfo cannot be converted to Table. " + + "Please specify RowTypeInfo explicitly.") case a: AtomicType[A] => if (exprs.length != 1) { throw new TableException("Table of atomic type can only have a single field.") diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index ebe79fa840cfae..92cb80d0125a77 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -28,6 +28,7 @@ import org.apache.calcite.tools.RuleSets; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; @@ -46,6 +47,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { @@ -413,6 +416,34 @@ public void testAsWithPojoAndGenericTypes() throws Exception { compareResultAsText(results, expected); } + @Test(expected = TableException.class) + public void testGenericRow() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // use null value the enforce GenericType + DataSet dataSet = env.fromElements(Row.of(1, 2L, "Hello", null)); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet); + } + + @Test(expected = TableException.class) + public void testGenericRowWithAlias() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // use null value the enforce GenericType + DataSet dataSet = env.fromElements(Row.of((Integer)null)); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField"); + } + @Test(expected = TableException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 05c2a491f3ca01..6554bc00ebca2f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -23,13 +23,14 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException} import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource import org.apache.flink.table.utils.TableTestBase -import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, term, unaryNode, binaryNode, streamTableNode} +import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode} +import org.apache.flink.types.Row import org.junit.Test import org.junit.Assert.assertEquals @@ -48,6 +49,8 @@ class TableEnvironmentTest extends TableTestBase { val atomicType = INT_TYPE_INFO + val genericRowType = new GenericTypeInfo[Row](classOf[Row]) + @Test def testGetFieldInfoTuple(): Unit = { val fieldInfo = tEnv.getFieldInfo(tupleType) @@ -80,6 +83,11 @@ class TableEnvironmentTest extends TableTestBase { fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRow(): Unit = { + tEnv.getFieldInfo(genericRowType) + } + @Test def testGetFieldInfoTupleNames(): Unit = { val fieldInfo = tEnv.getFieldInfo( @@ -280,6 +288,11 @@ class TableEnvironmentTest extends TableTestBase { )) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRowAlias(): Unit = { + tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first"))) + } + @Test def testSqlWithoutRegisteringForBatchTables(): Unit = { val util = batchTestUtil() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index 6cbe83402aae59..e61e1907e76c45 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch import java.util +import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets @@ -31,6 +32,7 @@ import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized +import org.junit.Assert.assertTrue import scala.collection.JavaConverters._ @@ -254,6 +256,34 @@ class TableEnvironmentITCase( CollectionDataSets.get3TupleDataSet(env) .toTable(tEnv, 'a as 'foo, 'b, 'c) } + + @Test(expected = classOf[TableException]) + def testGenericRow() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val dataSet = env.fromElements(Row.of(null)) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet) + } + + @Test(expected = classOf[TableException]) + def testGenericRowWithAlias() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val dataSet = env.fromElements(Row.of(null)) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField") + } } object TableEnvironmentITCase {