diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index c6f14f3523812..c4bc1be6436d8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -65,6 +65,9 @@ trait ImplicitExpressionOperations { def as(name: Symbol) = Naming(expr, name.name) + def asc = Asc(expr) + def desc = Desc(expr) + /** * Conditional operator that decides which of two other expressions should be evaluated * based on a evaluated boolean condition. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index a20a8e95b58d2..ffadca506a908 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -52,6 +52,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val CAST: Keyword = Keyword("cast") lazy val NULL: Keyword = Keyword("Null") lazy val EVAL: Keyword = Keyword("eval") + lazy val ASC: Keyword = Keyword("asc") + lazy val DESC: Keyword = Keyword("desc") def functionIdent: ExpressionParser.Parser[String] = not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ @@ -124,6 +126,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixIsNotNull: PackratParser[Expression] = composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) } + lazy val suffixAsc : PackratParser[Expression] = + (atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) }) + + lazy val suffixDesc : PackratParser[Expression] = + (atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) }) + + lazy val suffixSum: PackratParser[Expression] = composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) } @@ -181,7 +190,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixed: PackratParser[Expression] = suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg | - suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall + suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall | + suffixAsc | suffixDesc // prefix operators diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala new file mode 100644 index 0000000000000..75fa07829ef07 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder + +abstract class Ordering extends UnaryExpression { self: Product => +} + +case class Asc(child: Expression) extends Ordering { + override def toString: String = s"($child).asc" + + override def name: String = child.name + "-asc" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + child.toRexNode + } +} + +case class Desc(child: Expression) extends Ordering { + override def toString: String = s"($child).desc" + + override def name: String = child.name + "-desc" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.desc(child.toRexNode) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 715109e6b0724..b18d6747be444 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -81,31 +81,14 @@ abstract class BatchScan( // conversion if (determinedType != inputType) { - val generator = new CodeGenerator( - config, - input.getType, - flinkTable.fieldIndexes) - val conversion = generator.generateConverterResultExpression( + val mapFunc = getConversionMapper( + config, + inputType, determinedType, - getRowType.getFieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( "DataSetSourceConversion", - classOf[MapFunction[Any, Any]], - body, - determinedType) - - val mapFunc = new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) + getRowType.getFieldNames, + Some(flinkTable.fieldIndexes)) val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 12095a268de55..114122bd020aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -143,7 +143,12 @@ class DataSetAggregate( expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(typeConversion(config, rowTypeInfo, expectedType.get)) + result.map(getConversionMapper(config, + rowTypeInfo.asInstanceOf[TypeInformation[Any]], + expectedType.get, + "AggregateOutputConversion", + rowType.getFieldNames.asScala + )) .name(mapName) case _ => result } @@ -180,32 +185,4 @@ class DataSetAggregate( }.mkString(", ") } - private def typeConversion( - config: TableConfig, - rowTypeInfo: RowTypeInfo, - expectedType: TypeInformation[Any]): MapFunction[Any, Any] = { - - val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]]) - val conversion = generator.generateConverterResultExpression( - expectedType, rowType.getFieldNames.asScala) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - "AggregateOutputConversion", - classOf[MapFunction[Any, Any]], - body, - expectedType) - - new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - } - } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index e8f81fd53cb8f..b786896781d8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -21,10 +21,13 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig} +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment} import org.apache.flink.api.table.plan.nodes.FlinkRel +import org.apache.flink.api.table.runtime.MapRunner import scala.collection.JavaConversions._ @@ -64,4 +67,39 @@ trait DataSetRel extends RelNode with FlinkRel { } + private[dataset] def getConversionMapper( + config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Any], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Any] = { + + val generator = new CodeGenerator( + config, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction[Any, Any]], + body, + expectedType) + + new MapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala index 17a7db21d2599..a75131847a8fe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.BatchTableEnvironment import org.apache.flink.api.table.plan.schema.DataSetTable + /** * Flink RelNode which matches along with DataSource. * It ensures that types without deterministic field order (e.g. POJOs) are not part of diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala new file mode 100644 index 0000000000000..ef89b06a9e6cb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.PojoTypeInfo +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inp: RelNode, + collations: RelCollation, + rowType2: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ + new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType2 + ) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + + val config = tableEnv.getConfig + + val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + + val currentParallelism = inputDS.getExecutionEnvironment.getParallelism + var partitionedDs = if (currentParallelism == 1) { + inputDS + } else { + inputDS.partitionByRange(fieldCollations.map(_._1): _*) + .withOrders(fieldCollations.map(_._2): _*) + } + + fieldCollations.foreach { fieldCollation => + partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) + } + + val inputType = partitionedDs.getType + expectedType match { + + case None if config.getEfficientTypeUsage => + partitionedDs + + case _ => + val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + // conversion + if (determinedType != inputType) { + + val mapFunc = getConversionMapper(config, + partitionedDs.getType, + determinedType, + "DataSetSortConversion", + getRowType.getFieldNames.asScala + ) + + partitionedDs.map(mapFunc) + } + // no conversion necessary, forward + else { + partitionedDs + } + } + } + + private def directionToOrder(direction: Direction) = { + direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + } + + } + + private val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + + private val sortFieldsToString = fieldCollations + .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + + override def toString: String = s"Sort(by: $sortFieldsToString)" + + override def explainTerms(pw: RelWriter) : RelWriter = { + super.explainTerms(pw) + .item("orderBy", sortFieldsToString) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 06a8a8413da76..5d5912bed78c3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -102,6 +102,7 @@ object FlinkRuleSets { DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, DataSetUnionRule.INSTANCE, + DataSetSortRule.INSTANCE, DataSetValuesRule.INSTANCE, BatchTableSourceScanRule.INSTANCE ) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala new file mode 100644 index 0000000000000..b7f70e33f33af --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule + extends ConverterRule( + classOf[LogicalSort], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSortRule") { + + /** + * Only translate when no OFFSET or LIMIT specified + */ + override def matches(call: RelOptRuleCall): Boolean = { + val sort = call.rel(0).asInstanceOf[LogicalSort] + sort.offset == null && sort.fetch == null + } + + override def convert(rel: RelNode): RelNode = { + + val sort: LogicalSort = rel.asInstanceOf[LogicalSort] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE) + + new DataSetSort( + rel.getCluster, + traitSet, + convInput, + sort.getCollation, + rel.getRowType + ) + } +} + +object DataSetSortRule { + val INSTANCE: RelOptRule = new DataSetSortRule +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 6485139868358..68e1041c52fd5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -21,14 +21,13 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataTypeField import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode} import org.apache.calcite.sql.SqlKind import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.util.NlsString import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls -import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, - UnresolvedFieldReference, Expression} +import org.apache.flink.api.table.expressions._ import scala.collection.mutable import scala.collection.JavaConverters._ @@ -209,7 +208,7 @@ class Table( relBuilder.push(relNode) relBuilder.filter(predicate.toRexNode(relBuilder)) - + new Table(relBuilder.build(), tableEnv) } @@ -401,6 +400,50 @@ class Table( new Table(relBuilder.build(), tableEnv) } + /** + * Sorts the given [[Table]]. Similar to SQL ORDER BY. + * The resulting Table is sorted globally sorted across all parallel partitions. + * + * Example: + * + * {{{ + * tab.orderBy('name.desc) + * }}} + */ + def orderBy(fields: Expression*): Table = { + relBuilder.push(relNode) + + if (! fields.forall { + case x : UnresolvedFieldReference => true + case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference] + case _ => false + }) { + throw new IllegalArgumentException("All expressions must be field references " + + "or asc/desc expressions.") + } + + val exprs = fields.map(_.toRexNode(relBuilder)) + + relBuilder.sort(exprs.asJava) + new Table(relBuilder.build(), tableEnv) + + } + + /** + * Sorts the given [[Table]]. Similar to SQL ORDER BY. + * The resulting Table is sorted globally sorted across all parallel partitions. + * + * Example: + * + * {{{ + * tab.orderBy("name DESC") + * }}} + */ + def orderBy(fields: String): Table = { + val parsedFields = ExpressionParser.parseExpressionList(fields) + orderBy(parsedFields: _*) + } + private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { val names = exprs.map{ e => diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala new file mode 100644 index 0000000000000..94361c66090b6 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + def getExecutionEnvironment = { + val env = ExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env + } + + val tupleDataSetStrings = List((1, 1L, "Hi") + ,(2, 2L, "Hello") + ,(3, 2L, "Hello world") + ,(4, 3L, "Hello world, how are you?") + ,(5, 3L, "I am fine.") + ,(6, 3L, "Luke Skywalker") + ,(7, 4L, "Comment#1") + ,(8, 4L, "Comment#2") + ,(9, 4L, "Comment#3") + ,(10, 4L, "Comment#4") + ,(11, 5L, "Comment#5") + ,(12, 5L, "Comment#6") + ,(13, 5L, "Comment#7") + ,(14, 5L, "Comment#8") + ,(15, 5L, "Comment#9") + ,(16, 6L, "Comment#10") + ,(17, 6L, "Comment#11") + ,(18, 6L, "Comment#12") + ,(19, 6L, "Comment#13") + ,(20, 6L, "Comment#14") + ,(21, 6L, "Comment#15")) + + @Test + def testOrderByDesc(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.desc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + - x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test + def testOrderByAsc(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test + def testOrderByMultipleFieldsWithSql(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = { + dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "") + } + +} diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index a5112ec5850a4..4dda4cf51dc37 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -430,14 +430,23 @@ protected static File asFile(String path) { // -------------------------------------------------------------------------------------------- public static void compareResultAsTuples(List result, String expected) { - compareResult(result, expected, true); + compareResult(result, expected, true, true); } public static void compareResultAsText(List result, String expected) { - compareResult(result, expected, false); + compareResult(result, expected, + false, true); + } + + public static void compareOrderedResultAsText(List result, String expected) { + compareResult(result, expected, false, false); + } + + public static void compareOrderedResultAsText(List result, String expected, boolean asTuples) { + compareResult(result, expected, asTuples, false); } - private static void compareResult(List result, String expected, boolean asTuples) { + private static void compareResult(List result, String expected, boolean asTuples, boolean sort) { String[] expectedStrings = expected.split("\n"); String[] resultStrings = new String[result.size()]; @@ -466,8 +475,10 @@ private static void compareResult(List result, String expected, boolean a assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); - Arrays.sort(expectedStrings); - Arrays.sort(resultStrings); + if (sort) { + Arrays.sort(expectedStrings); + Arrays.sort(resultStrings); + } for (int i = 0; i < expectedStrings.length; i++) { assertEquals(expectedStrings[i], resultStrings[i]);