From eb28519799c17d9a5cbaba109d42a5709c7df848 Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Fri, 21 Apr 2017 23:48:28 -0700 Subject: [PATCH] Add support for UNNEST support in streaming SQL. Currently, only array is supported, and WITH ORDINALITY is not yet supported. --- docs/dev/table_api.md | 3 + .../flink/table/api/TableEnvironment.scala | 3 +- .../table/calcite/FlinkTypeFactory.scala | 5 +- .../utils/UserDefinedFunctionUtils.scala | 4 +- .../flink/table/plan/nodes/FlinkRel.scala | 3 +- .../table/plan/rules/FlinkRuleSets.scala | 1 + .../datastream/DataStreamCorrelateRule.scala | 14 ++- .../DataStreamCorrelateUnnestRule.scala | 100 ++++++++++++++++++ .../table/plan/util/ExplodeFunctionUtil.scala | 76 +++++++++++++ .../table/typeutils/TypeCheckUtils.scala | 2 +- .../table/api/java/stream/sql/SqlITCase.java | 28 +++++ .../api/java/stream/utils/StreamTestData.java | 13 +++ .../api/scala/stream/sql/SqlITCase.scala | 50 +++++++++ .../scala/stream/utils/StreamTestData.scala | 18 ++++ 14 files changed, 312 insertions(+), 8 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateUnnestRule.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 2a838c7f1a42d..e4e7114e7db1d 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1420,6 +1420,8 @@ val result2 = tableEnv.sql( #### Limitations +UNNEST currently only support array, and WITH ORDINALITY is not supported yet. + Joins, set operations, and non-windowed aggregations are not supported yet. {% top %} @@ -1629,6 +1631,7 @@ tableReference: tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' + | UNNEST '(' expression ')' values: VALUES expression [, expression ]* diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 2ddad45ae71c1..d54e2632c20c1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -36,7 +36,7 @@ import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools._ import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeinfo.{AtomicType, BasicArrayTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} @@ -844,6 +844,7 @@ object TableEnvironment { val fieldNames: Array[String] = inputType match { case t: CompositeType[_] => t.getFieldNames + case bati: BasicArrayTypeInfo[_, _] => Array("array0") case a: AtomicType[_] => Array("f0") case tpe => throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 22a5c9f74b99a..9a28bd1d081e5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ @@ -120,6 +120,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp case pa: PrimitiveArrayTypeInfo[_] => new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false) + case ba: BasicArrayTypeInfo[_, _] => + new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true) + case oa: ObjectArrayTypeInfo[_, _] => new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index c1cfe0610ae5e..a002a11d7ae0c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -344,7 +344,9 @@ object UserDefinedFunctionUtils { expected.isPrimitive && Primitives.wrap(expected) == candidate || candidate == classOf[Date] && (expected == classOf[Int] || expected == classOf[JInt]) || candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) || - candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) + candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) || + (candidate.isArray && expected.isArray && + candidate.getComponentType.isInstanceOf[Object] && expected.getComponentType == classOf[Object]) @throws[Exception] def serialize(function: UserDefinedFunction): String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 258d7f2af39c3..0a5fcab104bb1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -64,7 +64,8 @@ trait FlinkRel { val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable) val field = fa.getField.getName s"$referenceExpr.$field" - + case cv: RexCorrelVariable => + cv.toString case _ => throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr") } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 41f095f032afb..fca57a391aa2c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -182,6 +182,7 @@ object FlinkRuleSets { DataStreamUnionRule.INSTANCE, DataStreamValuesRule.INSTANCE, DataStreamCorrelateRule.INSTANCE, + DataStreamCorrelateUnnestRule.INSTANCE, StreamTableSourceScanRule.INSTANCE, // scan optimization diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala index adce9f4516c98..4e91b95042382 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala @@ -17,14 +17,22 @@ */ package org.apache.flink.table.plan.rules.datastream +import java.util + +import com.google.common.collect.ImmutableList import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, StructKind} import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rel.core.Uncollect +import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalTableFunctionScan} import org.apache.calcite.rex.RexNode -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention -import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamCorrelate} +import org.apache.flink.table.plan.schema.ArrayRelDataType +import org.apache.flink.table.plan.util.ExplodeFunctionUtil /** * Rule to convert a LogicalCorrelate into a DataStreamCorrelate. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateUnnestRule.scala new file mode 100644 index 0000000000000..85dbc9d9109fe --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateUnnestRule.scala @@ -0,0 +1,100 @@ +package org.apache.flink.table.plan.rules.datastream + +import java.util + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, StructKind} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Uncollect +import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamCorrelate} +import org.apache.flink.table.plan.schema.ArrayRelDataType +import org.apache.flink.table.plan.util.ExplodeFunctionUtil + +/** + * Rule to convert a LogicalCorrelate with Uncollect into a DataStreamCorrelate. + */ +class DataStreamCorrelateUnnestRule + extends ConverterRule( + classOf[LogicalCorrelate], + Convention.NONE, + DataStreamConvention.INSTANCE, + "DataStreamCorrelateUnnestRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] + val right = join.getRight.asInstanceOf[RelSubset].getOriginal + + right match { + // a filter is pushed above the table function + case filter: LogicalFilter => + filter + .getInput.asInstanceOf[RelSubset] + .getOriginal + .isInstanceOf[Uncollect] + case unCollect: Uncollect => true + case _ => false + } + } + + override def convert(rel: RelNode): RelNode = { + val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE) + val right: RelNode = join.getInput(1) + + def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = { + relNode match { + case rel: RelSubset => + convertToCorrelate(rel.getRelList.get(0), condition) + + case filter: LogicalFilter => + convertToCorrelate( + filter.getInput.asInstanceOf[RelSubset].getOriginal, + Some(filter.getCondition)) + + case unCollect: Uncollect => + val arrayRelDataType = unCollect.getInput(0).getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType] + val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions( + "explode", + ExplodeFunctionUtil.explodeTableFuncFromType(arrayRelDataType.typeInfo), + FlinkTypeFactory.toTypeInfo(arrayRelDataType.getComponentType), + rel.getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]) + + val rexCall = rel.getCluster.getRexBuilder.makeCall( + explodeTableFunc.head, + unCollect.getInput(0).asInstanceOf[RelSubset].getRelList.get(0).getChildExps); + val func = LogicalTableFunctionScan.create( + rel.getCluster, + new util.ArrayList[RelNode](), + rexCall, + classOf[Array[Object]], + new RelRecordType(StructKind.FULLY_QUALIFIED, ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, unCollect.getInput(0).getRowType.getFieldList.get(0).getValue.getComponentType))), + null) + new DataStreamCorrelate( + rel.getCluster, + traitSet, + convInput, + func, + condition, + rel.getRowType, + join.getRowType, + join.getJoinType, + description) + } + } + convertToCorrelate(right, None) + } + +} + +object DataStreamCorrelateUnnestRule { + val INSTANCE: RelOptRule = new DataStreamCorrelateUnnestRule +} + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala new file mode 100644 index 0000000000000..7b23a47c79c6f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala @@ -0,0 +1,76 @@ +package org.apache.flink.table.plan.util + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.table.functions.TableFunction + +/** + * Created by suez on 4/24/17. + */ +class ObjectExplodeTableFunc extends TableFunction[Object] { + def eval(arr: Array[Object]): Unit = { + arr.foreach(collect) + } +} + +class FloatExplodeTableFunc extends TableFunction[Float] { + def eval(arr: Array[Float]): Unit = { + arr.foreach(collect) + } +} + +class ShortExplodeTableFunc extends TableFunction[Short] { + def eval(arr: Array[Short]): Unit = { + arr.foreach(collect) + } +} +class IntExplodeTableFunc extends TableFunction[Int] { + def eval(arr: Array[Int]): Unit = { + arr.foreach(collect) + } +} + +class LongExplodeTableFunc extends TableFunction[Long] { + def eval(arr: Array[Long]): Unit = { + arr.foreach(collect) + } +} + +class DoubleExplodeTableFunc extends TableFunction[Double] { + def eval(arr: Array[Double]): Unit = { + arr.foreach(collect) + } +} + +class ByteExplodeTableFunc extends TableFunction[Byte] { + def eval(arr: Array[Byte]): Unit = { + arr.foreach(collect) + } +} + +class BooleanExplodeTableFunc extends TableFunction[Boolean] { + def eval(arr: Array[Boolean]): Unit = { + arr.foreach(collect) + } +} + +object ExplodeFunctionUtil { + def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = { + ti match { + case pati: PrimitiveArrayTypeInfo[_] => { + pati.getComponentType match { + case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc + case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc + case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc + case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc + case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc + case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc + case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc + } + } + case oati: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case bati: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported") + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala index 40f0cf298b90f..5e80b8f05d0d1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -65,7 +65,7 @@ object TypeCheckUtils { def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO def isArray(dataType: TypeInformation[_]): Boolean = dataType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true + case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] | _: BasicArrayTypeInfo[_, _] => true case _ => false } diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java index 7c01d2bbfe618..cb9a38092c1b4 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java @@ -61,6 +61,34 @@ public void testSelect() throws Exception { StreamITCase.compareWithList(expected); } + @Test + public void testUnnest() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + DataStream> ds = StreamTestData.get3TupleDataSetWithArray(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT * FROM MyTable as mt, unnest(mt.c) as T(s)"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,1,[121, 432],121"); + expected.add("1,1,[121, 432],432"); + expected.add("2,2,[45, 65],45"); + expected.add("2,2,[45, 65],65"); + expected.add("3,2,[121, 453],121"); + expected.add("3,2,[121, 453],453"); + + StreamITCase.compareWithList(expected); + } + @Test public void testFilter() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java index 139801fa1d5c0..62e61dd969ab1 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java @@ -18,6 +18,7 @@ package org.apache.flink.table.api.java.stream.utils; +import com.google.common.collect.ImmutableList; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.datastream.DataStream; @@ -41,6 +42,18 @@ public static DataStream> getSmall3TupleDataSet(St return env.fromCollection(data); } + public static DataStream> get3TupleDataSetWithArray(StreamExecutionEnvironment env) { + + List> data = new ArrayList<>(); + data.add(new Tuple3<>(1, 1L, ImmutableList.of(121,432).toArray(new Integer[2]))); + data.add(new Tuple3<>(2, 2L, ImmutableList.of(45, 65).toArray(new Integer[2]))); + data.add(new Tuple3<>(3, 2L, ImmutableList.of(121, 453).toArray(new Integer[2]))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + public static DataStream> get5TupleDataStream(StreamExecutionEnvironment env) { List> data = new ArrayList<>(); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 67d13b0f455a5..deadd68e66587 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -68,6 +68,55 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testUnnestPrimitiveArrayFromTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT a, b, s FROM MyTable as mt, UNNEST(mt.b) as T(s)" + + val t = StreamTestData.getSmall3TupleDataStreamWithPrimitiveArray(env).toTable(tEnv).as('a, 'b) + tEnv.registerTable("MyTable", t) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,[12, 45],12", + "1,[12, 45],45", + "2,[41, 5],41", + "2,[41, 5],5", + "3,[18, 42],18", + "3,[18, 42],42" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUnnestObjectArrayFromTableWithFilter(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT a, b, s, t FROM MyTable as mt, UNNEST(mt.b) as T(s, t) where s > 13" + + val t = StreamTestData.getSmall2TupleDataStreamWithTupleArray(env).toTable(tEnv).as('a, 'b) + tEnv.registerTable("MyTable", t) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "2,[(13,41.6), (14,45.2136)],14,45.2136", + "3,[(18,42.6)],18,42.6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + /** test filtering with registered table **/ @Test def testSimpleFilter(): Unit = { @@ -1155,4 +1204,5 @@ object SqlITCase { override def cancel(): Unit = ??? } + } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala index 6745039f1df05..56948d3d7cd34 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala @@ -33,6 +33,24 @@ object StreamTestData { env.fromCollection(data) } + def getSmall2TupleDataStreamWithTupleArray(env: StreamExecutionEnvironment): DataStream[(Int, Array[Tuple2[Int, String]])] = { + val data = new mutable.MutableList[(Int, Array[Tuple2[Int, String]])] + data.+=((1, Array((12, "45.6"), (12, "45.612")))) + data.+=((2, Array((13, "41.6"), (14, "45.2136")))) + data.+=((3, Array((18, "42.6")))) + + env.fromCollection(data) + } + + def getSmall3TupleDataStreamWithPrimitiveArray(env: StreamExecutionEnvironment): DataStream[(Int, Array[Short])] = { + val data = new mutable.MutableList[(Int, Array[Short])] + data.+=((1, Array(12, 45))) + data.+=((2, Array(41, 5))) + data.+=((3, Array(18, 42))) + + env.fromCollection(data) + } + def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Hi"))