Skip to content

Commit

Permalink
[FLINK-6059] [table] Reject GenericType<Row> when converting DataSet …
Browse files Browse the repository at this point in the history
…or DataStream to Table.
  • Loading branch information
fhueske committed Mar 15, 2017
1 parent 31ab4b2 commit 0064a9e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
Expand Up @@ -479,7 +479,14 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
(Array[String], Array[Int]) = {
(TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))

if (inputType.isInstanceOf[GenericTypeInfo[Row]]) {
throw new TableException(
"Row with GenericTypeInfo cannot be converted to Table. " +
"Please specify RowTypeInfo explicitly.")
} else {
(TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
}
}

/**
Expand All @@ -498,6 +505,10 @@ abstract class TableEnvironment(val config: TableConfig) {
TableEnvironment.validateType(inputType)

val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[Row] =>
throw new TableException(
"Row with GenericTypeInfo cannot be converted to Table. " +
"Please specify RowTypeInfo explicitly.")
case a: AtomicType[A] =>
if (exprs.length != 1) {
throw new TableException("Table of atomic type can only have a single field.")
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.calcite.tools.RuleSets;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
Expand All @@ -46,6 +47,8 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {

Expand Down Expand Up @@ -413,6 +416,34 @@ public void testAsWithPojoAndGenericTypes() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = TableException.class)
public void testGenericRow() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

// use null value the enforce GenericType
DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
assertTrue(dataSet.getType() instanceof GenericTypeInfo);
assertTrue(dataSet.getType().getTypeClass().equals(Row.class));

// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet);
}

@Test(expected = TableException.class)
public void testGenericRowWithAlias() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

// use null value the enforce GenericType
DataSet<Row> dataSet = env.fromElements(Row.of((Integer)null));
assertTrue(dataSet.getType() instanceof GenericTypeInfo);
assertTrue(dataSet.getType().getTypeClass().equals(Row.class));

// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet, "nullField");
}

@Test(expected = TableException.class)
public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand Down
Expand Up @@ -23,13 +23,14 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, term, unaryNode, binaryNode, streamTableNode}
import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
import org.junit.Test
import org.junit.Assert.assertEquals

Expand All @@ -48,6 +49,8 @@ class TableEnvironmentTest extends TableTestBase {

val atomicType = INT_TYPE_INFO

val genericRowType = new GenericTypeInfo[Row](classOf[Row])

@Test
def testGetFieldInfoTuple(): Unit = {
val fieldInfo = tEnv.getFieldInfo(tupleType)
Expand Down Expand Up @@ -80,6 +83,11 @@ class TableEnvironmentTest extends TableTestBase {
fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
}

@Test(expected = classOf[TableException])
def testGetFieldInfoGenericRow(): Unit = {
tEnv.getFieldInfo(genericRowType)
}

@Test
def testGetFieldInfoTupleNames(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
Expand Down Expand Up @@ -280,6 +288,11 @@ class TableEnvironmentTest extends TableTestBase {
))
}

@Test(expected = classOf[TableException])
def testGetFieldInfoGenericRowAlias(): Unit = {
tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
}

@Test
def testSqlWithoutRegisteringForBatchTables(): Unit = {
val util = batchTestUtil()
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch

import java.util

import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
Expand All @@ -31,6 +32,7 @@ import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.Assert.assertTrue

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -254,6 +256,34 @@ class TableEnvironmentITCase(
CollectionDataSets.get3TupleDataSet(env)
.toTable(tEnv, 'a as 'foo, 'b, 'c)
}

@Test(expected = classOf[TableException])
def testGenericRow() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env, config)

// use null value the enforce GenericType
val dataSet = env.fromElements(Row.of(null))
assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
assertTrue(dataSet.getType().getTypeClass == classOf[Row])

// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet)
}

@Test(expected = classOf[TableException])
def testGenericRowWithAlias() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env, config)

// use null value the enforce GenericType
val dataSet = env.fromElements(Row.of(null))
assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
assertTrue(dataSet.getType().getTypeClass == classOf[Row])

// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet, "nullField")
}
}

object TableEnvironmentITCase {
Expand Down

0 comments on commit 0064a9e

Please sign in to comment.