From b69338d570f51f2206ba15a2a6b7019256355b5a Mon Sep 17 00:00:00 2001 From: dawidwys Date: Fri, 22 Apr 2016 21:31:28 +0200 Subject: [PATCH 1/7] [FLINK-2946] Add orderBy() to Table API --- .../flink/api/scala/table/expressionDsl.scala | 3 + .../table/expressions/ExpressionParser.scala | 12 +- .../api/table/expressions/ordering.scala | 40 +++++ .../plan/nodes/dataset/DataSetSort.scala | 108 +++++++++++ .../api/table/plan/rules/FlinkRuleSets.scala | 1 + .../plan/rules/dataSet/DataSetSortRule.scala | 51 ++++++ .../org/apache/flink/api/table/table.scala | 53 +++++- .../api/scala/table/test/SortITCase.scala | 168 ++++++++++++++++++ .../apache/flink/test/util/TestBaseUtils.java | 21 ++- 9 files changed, 447 insertions(+), 10 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index c6f14f3523812..c4bc1be6436d8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -65,6 +65,9 @@ trait ImplicitExpressionOperations { def as(name: Symbol) = Naming(expr, name.name) + def asc = Asc(expr) + def desc = Desc(expr) + /** * Conditional operator that decides which of two other expressions should be evaluated * based on a evaluated boolean condition. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index a20a8e95b58d2..ffadca506a908 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -52,6 +52,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val CAST: Keyword = Keyword("cast") lazy val NULL: Keyword = Keyword("Null") lazy val EVAL: Keyword = Keyword("eval") + lazy val ASC: Keyword = Keyword("asc") + lazy val DESC: Keyword = Keyword("desc") def functionIdent: ExpressionParser.Parser[String] = not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ @@ -124,6 +126,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixIsNotNull: PackratParser[Expression] = composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) } + lazy val suffixAsc : PackratParser[Expression] = + (atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) }) + + lazy val suffixDesc : PackratParser[Expression] = + (atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) }) + + lazy val suffixSum: PackratParser[Expression] = composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) } @@ -181,7 +190,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixed: PackratParser[Expression] = suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg | - suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall + suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall | + suffixAsc | suffixDesc // prefix operators diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala new file mode 100644 index 0000000000000..f40edf0fefcfb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder + +case class Asc(child: Expression) extends UnaryExpression { + override def toString: String = s"($child).asc" + + override def name: String = child.name + "-asc" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + child.toRexNode + } +} + +case class Desc(child: Expression) extends UnaryExpression { + override def toString: String = s"($child).desc" + + override def name: String = child.name + "-desc" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.desc(child.toRexNode) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala new file mode 100644 index 0000000000000..2c429f9660807 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inp: RelNode, + collations: RelCollation, + rowType: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ + new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType + ) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + + val config = tableEnv.getConfig + + val inputDS = wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)) + + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage + ) + + val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + + val currentParallelism = if (inputDS.getParallelism >= 1) { + inputDS.getParallelism + } else { + inputDS.getExecutionEnvironment.getParallelism + } + + var partitionedDs = if (currentParallelism == 1) { + inputDS.setParallelism(1) + } else { + wrapDataSet(inputDS.partitionByRange(fieldCollations.map(_._1): _*).javaSet + .asInstanceOf[PartitionOperator[Any]] + .withOrders(fieldCollations.map(_._2): _*)) + } + + fieldCollations.foreach { fieldCollation => + partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) + } + + partitionedDs.javaSet + } + + private def directionToOrder(direction: Direction) = { + direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + } + + } + + private def wrapDataSet(dataSet: DataSet[Any]) = { + new org.apache.flink.api.scala.DataSet(dataSet) + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 06a8a8413da76..5d5912bed78c3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -102,6 +102,7 @@ object FlinkRuleSets { DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, DataSetUnionRule.INSTANCE, + DataSetSortRule.INSTANCE, DataSetValuesRule.INSTANCE, BatchTableSourceScanRule.INSTANCE ) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala new file mode 100644 index 0000000000000..903c3407a1a64 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule + extends ConverterRule( + classOf[LogicalSort], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkSortRule") { + override def convert(rel: RelNode): RelNode = { + + val sort: LogicalSort = rel.asInstanceOf[LogicalSort] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE) + + new DataSetSort( + rel.getCluster, + traitSet, + convInput, + sort.getCollation, + rel.getRowType + ) + } +} + +object DataSetSortRule { + val INSTANCE: RelOptRule = new DataSetSortRule +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 6485139868358..7f785b5bdce82 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -21,14 +21,13 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataTypeField import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode} import org.apache.calcite.sql.SqlKind import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.util.NlsString import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls -import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, - UnresolvedFieldReference, Expression} +import org.apache.flink.api.table.expressions._ import scala.collection.mutable import scala.collection.JavaConverters._ @@ -209,7 +208,7 @@ class Table( relBuilder.push(relNode) relBuilder.filter(predicate.toRexNode(relBuilder)) - + new Table(relBuilder.build(), tableEnv) } @@ -401,6 +400,52 @@ class Table( new Table(relBuilder.build(), tableEnv) } + /** + * Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across + * all partitions with keys equal to the given fields. + * + * Example: + * + * {{{ + * tab.orderBy('name) + * }}} + */ + def orderBy(fields: Expression*): Table = { + relBuilder.push(relNode) + + if (! fields.forall { + case x: UnresolvedFieldReference => true + case x@(_: Asc | _: Desc) => x.asInstanceOf[UnaryExpression] + .child + .isInstanceOf[UnresolvedFieldReference] + case _ => false + }) { + throw new IllegalArgumentException("All expressions must be field references " + + "or asc/desc expressions.") + } + + val exprs = fields.map(_.toRexNode(relBuilder)) + + relBuilder.sort(exprs.asJava) + new Table(relBuilder.build(), tableEnv) + + } + + /** + * Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across + * all partitions with keys equal to the given fields. + * + * Example: + * + * {{{ + * tab.orderBy("name") + * }}} + */ + def orderBy(fields: String): Table = { + val parsedFields = ExpressionParser.parseExpressionList(fields) + orderBy(parsedFields: _*) + } + private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { val names = exprs.map{ e => diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala new file mode 100644 index 0000000000000..b30d5fa6da071 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testSimpleOrderBy(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1) + + val expected = "1,First\n2,Second\n3,Third" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByDesc(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + + val expected = "3,Third\n2,Second\n1,First" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByAsc(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.asc) + + val expected = "1,First\n2,Second\n3,Third" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByFieldName(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .as('id, 'name).orderBy("id") + + val expected = "1,First\n2,Second\n3,Third" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByDescStringExpression(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .as('id, 'name).orderBy("id.desc") + + val expected = "3,Third\n2,Second\n1,First" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByDescStringExpressionKeyword(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .as('id, 'name).orderBy("id DESC") + + val expected = "3,Third\n2,Second\n1,First" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, "Second"), + (2, 1, "Third")).toTable(tEnv).orderBy('_1.desc, '_2.desc) + + val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First" + val results = t.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + env.setParallelism(2) + val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), + (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) + .toTable(tEnv).orderBy('_1.asc, '_2.desc) + + val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], + - x.productElement(1).asInstanceOf[Int])) + + val result = results.sortBy(p => p.min).reduceLeft(_ ++ _) + + compareOrderedResultAsText(expected, result) + } + + private def compareOrderedResultAsText[T](expected: String, results: Seq[T]) = { + if (configMode == TableConfigMode.EFFICIENT) { + results match { + case x if x.exists(_.isInstanceOf[Tuple]) => + TestBaseUtils.compareOrderedResultAsText(results.asJava, expected, true) + case x if x.exists(_.isInstanceOf[Product]) => + TestBaseUtils.compareOrderedResultAsText(results.asInstanceOf[Seq[Product]] + .map(_.productIterator.mkString(",")).asJava, expected) + case _ => TestBaseUtils.compareOrderedResultAsText(results.asJava, expected) + } + } else { + TestBaseUtils.compareOrderedResultAsText(results.asJava, expected) + } + } + +} diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index a5112ec5850a4..4dda4cf51dc37 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -430,14 +430,23 @@ protected static File asFile(String path) { // -------------------------------------------------------------------------------------------- public static void compareResultAsTuples(List result, String expected) { - compareResult(result, expected, true); + compareResult(result, expected, true, true); } public static void compareResultAsText(List result, String expected) { - compareResult(result, expected, false); + compareResult(result, expected, + false, true); + } + + public static void compareOrderedResultAsText(List result, String expected) { + compareResult(result, expected, false, false); + } + + public static void compareOrderedResultAsText(List result, String expected, boolean asTuples) { + compareResult(result, expected, asTuples, false); } - private static void compareResult(List result, String expected, boolean asTuples) { + private static void compareResult(List result, String expected, boolean asTuples, boolean sort) { String[] expectedStrings = expected.split("\n"); String[] resultStrings = new String[result.size()]; @@ -466,8 +475,10 @@ private static void compareResult(List result, String expected, boolean a assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); - Arrays.sort(expectedStrings); - Arrays.sort(resultStrings); + if (sort) { + Arrays.sort(expectedStrings); + Arrays.sort(resultStrings); + } for (int i = 0; i < expectedStrings.length; i++) { assertEquals(expectedStrings[i], resultStrings[i]); From b84677fe5cd66f809f1bfd00a875d204a3515731 Mon Sep 17 00:00:00 2001 From: dawidwys Date: Fri, 22 Apr 2016 22:07:20 +0200 Subject: [PATCH 2/7] Removed tests ending in same execution plan --- .../api/scala/table/test/SortITCase.scala | 52 ------------------- 1 file changed, 52 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala index b30d5fa6da071..19567c69df1eb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -38,19 +38,6 @@ class SortITCase( configMode: TableConfigMode) extends TableProgramsTestBase(mode, configMode) { - @Test - def testSimpleOrderBy(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .orderBy('_1) - - val expected = "1,First\n2,Second\n3,Third" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) - } - @Test def testOrderByDesc(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -77,45 +64,6 @@ class SortITCase( compareOrderedResultAsText(expected, results) } - @Test - def testOrderByFieldName(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .as('id, 'name).orderBy("id") - - val expected = "1,First\n2,Second\n3,Third" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) - } - - @Test - def testOrderByDescStringExpression(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .as('id, 'name).orderBy("id.desc") - - val expected = "3,Third\n2,Second\n1,First" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) - } - - @Test - def testOrderByDescStringExpressionKeyword(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .as('id, 'name).orderBy("id DESC") - - val expected = "3,Third\n2,Second\n1,First" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) - } - @Test def testOrderByMultipleFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment From 445e110d147be359cfabc7e2a68d4b8bbc2136ea Mon Sep 17 00:00:00 2001 From: dawidwys Date: Fri, 22 Apr 2016 22:16:00 +0200 Subject: [PATCH 3/7] Added test with SQL query --- .../api/scala/table/test/SortITCase.scala | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala index 19567c69df1eb..8a8bec1d7b328 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -64,19 +64,6 @@ class SortITCase( compareOrderedResultAsText(expected, results) } - @Test - def testOrderByMultipleFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, "Second"), - (2, 1, "Third")).toTable(tEnv).orderBy('_1.desc, '_2.desc) - - val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) - } - @Test def testOrderByMultipleFieldsDifferentDirections(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -98,6 +85,24 @@ class SortITCase( compareOrderedResultAsText(expected, result) } + @Test + def testOrderByMultipleFieldsWithSql(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" + + val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, "Second"), + (2, 1, "Third")).toTable(tEnv) + tEnv.registerDataSet("MyTable", t) + + val queryResult = tEnv.sql(sqlQuery) + + val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First" + val results = queryResult.toDataSet[Row].setParallelism(1).collect() + compareOrderedResultAsText(expected, results) + } + private def compareOrderedResultAsText[T](expected: String, results: Seq[T]) = { if (configMode == TableConfigMode.EFFICIENT) { results match { From e9cf63431704a2ddbcf5126b831cc735bcb7d7ad Mon Sep 17 00:00:00 2001 From: dawidwys Date: Mon, 25 Apr 2016 20:15:42 +0200 Subject: [PATCH 4/7] Conversion to expected type. Tests upgraded. --- .../api/table/expressions/ordering.scala | 7 +- .../plan/nodes/dataset/DataSetAggregate.scala | 35 +---- .../plan/nodes/dataset/DataSetScan.scala | 1 + .../plan/nodes/dataset/DataSetSort.scala | 83 +++++++----- .../table/plan/nodes/dataset/package.scala | 62 +++++++++ .../plan/rules/dataSet/DataSetSortRule.scala | 14 +- .../org/apache/flink/api/table/table.scala | 18 ++- .../api/scala/table/test/SortITCase.scala | 120 +++++++++++------- 8 files changed, 217 insertions(+), 123 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala index f40edf0fefcfb..a58007c19cc37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala @@ -19,7 +19,10 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -case class Asc(child: Expression) extends UnaryExpression { +abstract class Ordering extends UnaryExpression { self: Product => +} + +case class Asc(child: Expression) extends Ordering{ override def toString: String = s"($child).asc" override def name: String = child.name + "-asc" @@ -29,7 +32,7 @@ case class Asc(child: Expression) extends UnaryExpression { } } -case class Desc(child: Expression) extends UnaryExpression { +case class Desc(child: Expression) extends Ordering { override def toString: String = s"($child).desc" override def name: String = child.name + "-desc" diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 12095a268de55..114122bd020aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -143,7 +143,12 @@ class DataSetAggregate( expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(typeConversion(config, rowTypeInfo, expectedType.get)) + result.map(getConversionMapper(config, + rowTypeInfo.asInstanceOf[TypeInformation[Any]], + expectedType.get, + "AggregateOutputConversion", + rowType.getFieldNames.asScala + )) .name(mapName) case _ => result } @@ -180,32 +185,4 @@ class DataSetAggregate( }.mkString(", ") } - private def typeConversion( - config: TableConfig, - rowTypeInfo: RowTypeInfo, - expectedType: TypeInformation[Any]): MapFunction[Any, Any] = { - - val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]]) - val conversion = generator.generateConverterResultExpression( - expectedType, rowType.getFieldNames.asScala) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - "AggregateOutputConversion", - classOf[MapFunction[Any, Any]], - body, - expectedType) - - new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - } - } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala index 17a7db21d2599..a75131847a8fe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.BatchTableEnvironment import org.apache.flink.api.table.plan.schema.DataSetTable + /** * Flink RelNode which matches along with DataSource. * It ensures that types without deterministic field order (e.g. POJOs) are not part of diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index 2c429f9660807..b6dd43faf9a50 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -23,13 +23,12 @@ import java.util import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} -import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.java.typeutils.PojoTypeInfo +import org.apache.flink.api.table.BatchTableEnvironment import org.apache.flink.api.table.typeutils.TypeConverter._ import scala.collection.JavaConverters._ @@ -39,18 +38,17 @@ class DataSetSort( traitSet: RelTraitSet, inp: RelNode, collations: RelCollation, - rowType: RelDataType) + rowType2: RelDataType) extends SingleRel(cluster, traitSet, inp) with DataSetRel{ - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ new DataSetSort( cluster, traitSet, inputs.get(0), collations, - rowType + rowType2 ) } @@ -60,37 +58,50 @@ class DataSetSort( val config = tableEnv.getConfig - val inputDS = wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)) - - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage - ) - - val fieldCollations = collations.getFieldCollations.asScala - .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) - - val currentParallelism = if (inputDS.getParallelism >= 1) { - inputDS.getParallelism - } else { - inputDS.getExecutionEnvironment.getParallelism - } + val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val currentParallelism = inputDS.getExecutionEnvironment.getParallelism var partitionedDs = if (currentParallelism == 1) { - inputDS.setParallelism(1) + inputDS } else { - wrapDataSet(inputDS.partitionByRange(fieldCollations.map(_._1): _*).javaSet - .asInstanceOf[PartitionOperator[Any]] - .withOrders(fieldCollations.map(_._2): _*)) + inputDS.partitionByRange(fieldCollations.map(_._1): _*) + .withOrders(fieldCollations.map(_._2): _*) } fieldCollations.foreach { fieldCollation => partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) } - partitionedDs.javaSet + val inputType = partitionedDs.getType + expectedType match { + + case None if config.getEfficientTypeUsage => + partitionedDs + + case _ => + val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + // conversion + if (determinedType != inputType) { + + val mapFunc = getConversionMapper(config, + partitionedDs.getType, + determinedType, + "DataSetSortConversion", + getRowType.getFieldNames.asScala + ) + + partitionedDs.map(mapFunc) + } + // no conversion necessary, forward + else { + partitionedDs + } + } } private def directionToOrder(direction: Direction) = { @@ -101,8 +112,16 @@ class DataSetSort( } - private def wrapDataSet(dataSet: DataSet[Any]) = { - new org.apache.flink.api.scala.DataSet(dataSet) - } + private val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + private val sortFieldsToString = fieldCollations + .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + + override def toString: String = s"Sort(by: $sortFieldsToString)" + + override def explainTerms(pw: RelWriter) : RelWriter = { + super.explainTerms(pw) + .item("by", sortFieldsToString) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala new file mode 100644 index 0000000000000..db07f3841ce6a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Any], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Any] = { + + val generator = new CodeGenerator(config, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction[Any, Any]], + body, + expectedType) + + new MapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + } + + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala index 903c3407a1a64..68c01c5c3c63b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala @@ -18,10 +18,11 @@ package org.apache.flink.api.table.plan.rules.dataSet -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} class DataSetSortRule @@ -30,6 +31,15 @@ class DataSetSortRule Convention.NONE, DataSetConvention.INSTANCE, "FlinkSortRule") { + + /** + * Only translate when no OFFSET or LIMIT specified + */ + override def matches(call: RelOptRuleCall): Boolean = { + val sort = call.rel(0).asInstanceOf[LogicalSort] + sort.offset == null && sort.fetch == null + } + override def convert(rel: RelNode): RelNode = { val sort: LogicalSort = rel.asInstanceOf[LogicalSort] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 7f785b5bdce82..68e1041c52fd5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -401,23 +401,21 @@ class Table( } /** - * Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across - * all partitions with keys equal to the given fields. + * Sorts the given [[Table]]. Similar to SQL ORDER BY. + * The resulting Table is sorted globally sorted across all parallel partitions. * * Example: * * {{{ - * tab.orderBy('name) + * tab.orderBy('name.desc) * }}} */ def orderBy(fields: Expression*): Table = { relBuilder.push(relNode) if (! fields.forall { - case x: UnresolvedFieldReference => true - case x@(_: Asc | _: Desc) => x.asInstanceOf[UnaryExpression] - .child - .isInstanceOf[UnresolvedFieldReference] + case x : UnresolvedFieldReference => true + case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference] case _ => false }) { throw new IllegalArgumentException("All expressions must be field references " + @@ -432,13 +430,13 @@ class Table( } /** - * Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across - * all partitions with keys equal to the given fields. + * Sorts the given [[Table]]. Similar to SQL ORDER BY. + * The resulting Table is sorted globally sorted across all parallel partitions. * * Example: * * {{{ - * tab.orderBy("name") + * tab.orderBy("name DESC") * }}} */ def orderBy(fields: String): Table = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala index 8a8bec1d7b328..ddb616cc71860 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -18,8 +18,8 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.api.table.{Row, TableEnvironment} import org.apache.flink.api.table.test.utils.TableProgramsTestBase @@ -38,84 +38,108 @@ class SortITCase( configMode: TableConfigMode) extends TableProgramsTestBase(mode, configMode) { + def getExecutionEnvironment = { + val env = ExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env + } + + val tupleDataSetStrings = List((1, 1L, "Hi") + ,(2, 2L, "Hello") + ,(3, 2L, "Hello world") + ,(4, 3L, "Hello world, how are you?") + ,(5, 3L, "I am fine.") + ,(6, 3L, "Luke Skywalker") + ,(7, 4L, "Comment#1") + ,(8, 4L, "Comment#2") + ,(9, 4L, "Comment#3") + ,(10, 4L, "Comment#4") + ,(11, 5L, "Comment#5") + ,(12, 5L, "Comment#6") + ,(13, 5L, "Comment#7") + ,(14, 5L, "Comment#8") + ,(15, 5L, "Comment#9") + ,(16, 6L, "Comment#10") + ,(17, 6L, "Comment#11") + ,(18, 6L, "Comment#12") + ,(19, 6L, "Comment#13") + ,(20, 6L, "Comment#14") + ,(21, 6L, "Comment#15")) + @Test def testOrderByDesc(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment + val env = getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .orderBy('_1.desc) - val expected = "3,Third\n2,Second\n1,First" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.desc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @Test def testOrderByAsc(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment + val env = getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) - .orderBy('_1.asc) - val expected = "1,First\n2,Second\n3,Third" - val results = t.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @Test def testOrderByMultipleFieldsDifferentDirections(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment + val env = getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - - env.setParallelism(2) - val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), - (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) - .toTable(tEnv).orderBy('_1.asc, '_2.desc) - val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => (x.productElement(0).asInstanceOf[Int], + - x.productElement(1).asInstanceOf[Long])) - implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], - - x.productElement(1).asInstanceOf[Int])) + val expected = sortExpectedly(tupleDataSetStrings) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.sortBy(p => p.min).reduceLeft(_ ++ _) + val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) - compareOrderedResultAsText(expected, result) + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @Test def testOrderByMultipleFieldsWithSql(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment + val env = getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => (- x.productElement(0).asInstanceOf[Int], + - x.productElement(1).asInstanceOf[Long])) + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) - val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, "Second"), - (2, 1, "Third")).toTable(tEnv) - tEnv.registerDataSet("MyTable", t) + val expected = sortExpectedly(tupleDataSetStrings) + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val queryResult = tEnv.sql(sqlQuery) + val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) - val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First" - val results = queryResult.toDataSet[Row].setParallelism(1).collect() - compareOrderedResultAsText(expected, results) + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } - private def compareOrderedResultAsText[T](expected: String, results: Seq[T]) = { - if (configMode == TableConfigMode.EFFICIENT) { - results match { - case x if x.exists(_.isInstanceOf[Tuple]) => - TestBaseUtils.compareOrderedResultAsText(results.asJava, expected, true) - case x if x.exists(_.isInstanceOf[Product]) => - TestBaseUtils.compareOrderedResultAsText(results.asInstanceOf[Seq[Product]] - .map(_.productIterator.mkString(",")).asJava, expected) - case _ => TestBaseUtils.compareOrderedResultAsText(results.asJava, expected) - } - } else { - TestBaseUtils.compareOrderedResultAsText(results.asJava, expected) - } + def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = { + dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "") } } From 7dfc90fb8f5244ee539466c5bd9bf3101fad5918 Mon Sep 17 00:00:00 2001 From: dawidwys Date: Tue, 26 Apr 2016 21:36:43 +0200 Subject: [PATCH 5/7] Shortened lines beneath 100 signs --- .../flink/api/scala/table/test/SortITCase.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala index ddb616cc71860..9395561b77f57 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -73,7 +73,8 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.desc) - implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + - x.productElement(0).asInstanceOf[Int]) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() @@ -90,7 +91,8 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.asc) - implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => x.productElement(0).asInstanceOf[Int]) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() @@ -107,8 +109,8 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc) - implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => (x.productElement(0).asInstanceOf[Int], - - x.productElement(1).asInstanceOf[Long])) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() @@ -124,8 +126,8 @@ class SortITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" - implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => (- x.productElement(0).asInstanceOf[Int], - - x.productElement(1).asInstanceOf[Long])) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) From 7f67b42739b4b0c0a035da291ae52b04bcdd784f Mon Sep 17 00:00:00 2001 From: dawidwys Date: Thu, 28 Apr 2016 07:42:34 +0200 Subject: [PATCH 6/7] Code style refactoring --- .../api/table/expressions/ordering.scala | 2 +- .../table/plan/nodes/dataset/DataSetRel.scala | 40 +++++++++++- .../plan/nodes/dataset/DataSetSort.scala | 2 +- .../table/plan/nodes/dataset/package.scala | 62 ------------------- .../plan/rules/dataSet/DataSetSortRule.scala | 2 +- .../api/scala/table/test/SortITCase.scala | 8 +-- 6 files changed, 46 insertions(+), 70 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala index a58007c19cc37..75fa07829ef07 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala @@ -22,7 +22,7 @@ import org.apache.calcite.tools.RelBuilder abstract class Ordering extends UnaryExpression { self: Product => } -case class Asc(child: Expression) extends Ordering{ +case class Asc(child: Expression) extends Ordering { override def toString: String = s"($child).asc" override def name: String = child.name + "-asc" diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index e8f81fd53cb8f..b786896781d8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -21,10 +21,13 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig} +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment} import org.apache.flink.api.table.plan.nodes.FlinkRel +import org.apache.flink.api.table.runtime.MapRunner import scala.collection.JavaConversions._ @@ -64,4 +67,39 @@ trait DataSetRel extends RelNode with FlinkRel { } + private[dataset] def getConversionMapper( + config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Any], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Any] = { + + val generator = new CodeGenerator( + config, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction[Any, Any]], + body, + expectedType) + + new MapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index b6dd43faf9a50..ef89b06a9e6cb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -122,6 +122,6 @@ class DataSetSort( override def explainTerms(pw: RelWriter) : RelWriter = { super.explainTerms(pw) - .item("by", sortFieldsToString) + .item("orderBy", sortFieldsToString) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala deleted file mode 100644 index db07f3841ce6a..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.nodes - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.runtime.MapRunner - -package object dataset { - - private[dataset] def getConversionMapper(config: TableConfig, - inputType: TypeInformation[Any], - expectedType: TypeInformation[Any], - conversionOperatorName: String, - fieldNames: Seq[String], - inputPojoFieldMapping: Option[Array[Int]] = None) - : MapFunction[Any, Any] = { - - val generator = new CodeGenerator(config, - inputType, - None, - inputPojoFieldMapping) - val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - conversionOperatorName, - classOf[MapFunction[Any, Any]], - body, - expectedType) - - new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - } - - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala index 68c01c5c3c63b..b7f70e33f33af 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala @@ -30,7 +30,7 @@ class DataSetSortRule classOf[LogicalSort], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkSortRule") { + "DataSetSortRule") { /** * Only translate when no OFFSET or LIMIT specified diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala index 9395561b77f57..94361c66090b6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala @@ -79,7 +79,7 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -97,7 +97,7 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -115,7 +115,7 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -135,7 +135,7 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings) val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } From 445450bc47b7f13fc5863fdde8e5dd86bf817c49 Mon Sep 17 00:00:00 2001 From: dawidwys Date: Thu, 28 Apr 2016 07:55:28 +0200 Subject: [PATCH 7/7] Changes after rebase --- .../table/plan/nodes/dataset/BatchScan.scala | 27 ++++--------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 715109e6b0724..b18d6747be444 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -81,31 +81,14 @@ abstract class BatchScan( // conversion if (determinedType != inputType) { - val generator = new CodeGenerator( - config, - input.getType, - flinkTable.fieldIndexes) - val conversion = generator.generateConverterResultExpression( + val mapFunc = getConversionMapper( + config, + inputType, determinedType, - getRowType.getFieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( "DataSetSourceConversion", - classOf[MapFunction[Any, Any]], - body, - determinedType) - - val mapFunc = new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) + getRowType.getFieldNames, + Some(flinkTable.fieldIndexes)) val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"