Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ trait ImplicitExpressionOperations {

def as(name: Symbol) = Naming(expr, name.name)

def asc = Asc(expr)
def desc = Desc(expr)

/**
* Conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val CAST: Keyword = Keyword("cast")
lazy val NULL: Keyword = Keyword("Null")
lazy val EVAL: Keyword = Keyword("eval")
lazy val ASC: Keyword = Keyword("asc")
lazy val DESC: Keyword = Keyword("desc")

def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
Expand Down Expand Up @@ -124,6 +126,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val suffixIsNotNull: PackratParser[Expression] =
composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }

lazy val suffixAsc : PackratParser[Expression] =
(atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) })

lazy val suffixDesc : PackratParser[Expression] =
(atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) })


lazy val suffixSum: PackratParser[Expression] =
composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }

Expand Down Expand Up @@ -181,7 +190,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {

lazy val suffixed: PackratParser[Expression] =
suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg |
suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall
suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall |
suffixAsc | suffixDesc

// prefix operators

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder

abstract class Ordering extends UnaryExpression { self: Product =>
}

case class Asc(child: Expression) extends Ordering {
override def toString: String = s"($child).asc"

override def name: String = child.name + "-asc"

override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
child.toRexNode
}
}

case class Desc(child: Expression) extends Ordering {
override def toString: String = s"($child).desc"

override def name: String = child.name + "-desc"

override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.desc(child.toRexNode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,14 @@ abstract class BatchScan(

// conversion
if (determinedType != inputType) {
val generator = new CodeGenerator(
config,
input.getType,
flinkTable.fieldIndexes)

val conversion = generator.generateConverterResultExpression(
val mapFunc = getConversionMapper(
config,
inputType,
determinedType,
getRowType.getFieldNames)

val body =
s"""
|${conversion.code}
|return ${conversion.resultTerm};
|""".stripMargin

val genFunction = generator.generateFunction(
"DataSetSourceConversion",
classOf[MapFunction[Any, Any]],
body,
determinedType)

val mapFunc = new MapRunner[Any, Any](
genFunction.name,
genFunction.code,
genFunction.returnType)
getRowType.getFieldNames,
Some(flinkTable.fieldIndexes))

val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ class DataSetAggregate(
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
result.map(typeConversion(config, rowTypeInfo, expectedType.get))
result.map(getConversionMapper(config,
rowTypeInfo.asInstanceOf[TypeInformation[Any]],
expectedType.get,
"AggregateOutputConversion",
rowType.getFieldNames.asScala
))
.name(mapName)
case _ => result
}
Expand Down Expand Up @@ -180,32 +185,4 @@ class DataSetAggregate(
}.mkString(", ")
}

private def typeConversion(
config: TableConfig,
rowTypeInfo: RowTypeInfo,
expectedType: TypeInformation[Any]): MapFunction[Any, Any] = {

val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]])
val conversion = generator.generateConverterResultExpression(
expectedType, rowType.getFieldNames.asScala)

val body =
s"""
|${conversion.code}
|return ${conversion.resultTerm};
|""".stripMargin

val genFunction = generator.generateFunction(
"AggregateOutputConversion",
classOf[MapFunction[Any, Any]],
body,
expectedType)

new MapRunner[Any, Any](
genFunction.name,
genFunction.code,
genFunction.returnType)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package org.apache.flink.api.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig}
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment}
import org.apache.flink.api.table.plan.nodes.FlinkRel
import org.apache.flink.api.table.runtime.MapRunner

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -64,4 +67,39 @@ trait DataSetRel extends RelNode with FlinkRel {

}

private[dataset] def getConversionMapper(
config: TableConfig,
inputType: TypeInformation[Any],
expectedType: TypeInformation[Any],
conversionOperatorName: String,
fieldNames: Seq[String],
inputPojoFieldMapping: Option[Array[Int]] = None)
: MapFunction[Any, Any] = {

val generator = new CodeGenerator(
config,
inputType,
None,
inputPojoFieldMapping)
val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)

val body =
s"""
|${conversion.code}
|return ${conversion.resultTerm};
|""".stripMargin

val genFunction = generator.generateFunction(
conversionOperatorName,
classOf[MapFunction[Any, Any]],
body,
expectedType)

new MapRunner[Any, Any](
genFunction.name,
genFunction.code,
genFunction.returnType)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.plan.schema.DataSetTable


/**
* Flink RelNode which matches along with DataSource.
* It ensures that types without deterministic field order (e.g. POJOs) are not part of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.plan.nodes.dataset

import java.util

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.PojoTypeInfo
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.typeutils.TypeConverter._

import scala.collection.JavaConverters._

class DataSetSort(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inp: RelNode,
collations: RelCollation,
rowType2: RelDataType)
extends SingleRel(cluster, traitSet, inp)
with DataSetRel{

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={
new DataSetSort(
cluster,
traitSet,
inputs.get(0),
collations,
rowType2
)
}

override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are ignoring the expectedType. The convention here is to return a DataSet of that type.
If the input type (inputDS.getType()) is not equal to the expected type, we need to add a Map function after the sort, which translates the records into the expected type. See DataSetSource for an example of how to do that.


val config = tableEnv.getConfig

val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
var partitionedDs = if (currentParallelism == 1) {
inputDS
} else {
inputDS.partitionByRange(fieldCollations.map(_._1): _*)
.withOrders(fieldCollations.map(_._2): _*)
}

fieldCollations.foreach { fieldCollation =>
partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
}

val inputType = partitionedDs.getType
expectedType match {

case None if config.getEfficientTypeUsage =>
partitionedDs

case _ =>
val determinedType = determineReturnType(
getRowType,
expectedType,
config.getNullCheck,
config.getEfficientTypeUsage)

// conversion
if (determinedType != inputType) {

val mapFunc = getConversionMapper(config,
partitionedDs.getType,
determinedType,
"DataSetSortConversion",
getRowType.getFieldNames.asScala
)

partitionedDs.map(mapFunc)
}
// no conversion necessary, forward
else {
partitionedDs
}
}
}

private def directionToOrder(direction: Direction) = {
direction match {
case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
}

}

private val fieldCollations = collations.getFieldCollations.asScala
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))

private val sortFieldsToString = fieldCollations
.map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")

override def toString: String = s"Sort(by: $sortFieldsToString)"

override def explainTerms(pw: RelWriter) : RelWriter = {
super.explainTerms(pw)
.item("orderBy", sortFieldsToString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ object FlinkRuleSets {
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
DataSetUnionRule.INSTANCE,
DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE
)
Expand Down
Loading