From 4e792eac7ff24992921f1750cd757ebf83dc97e2 Mon Sep 17 00:00:00 2001 From: rtudoran Date: Wed, 12 Apr 2017 17:49:04 +0200 Subject: [PATCH 1/3] Add sort backbone support Implement the sort based on process function --- .../nodes/datastream/DataStreamSort.scala | 169 +++++++++ .../table/plan/rules/FlinkRuleSets.scala | 4 +- .../rules/datastream/DataStreamSortRule.scala | 67 ++++ .../ProcTimeBoundedSortProcessFunction.scala | 114 ++++++ .../table/runtime/aggregate/SortUtil.scala | 353 ++++++++++++++++++ .../stream/sql/WindowAggregateTest.scala | 21 ++ ...ocessingOverRangeProcessFunctionTest.scala | 81 ++++ 7 files changed, 808 insertions(+), 1 deletion(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala new file mode 100644 index 0000000000000..46cc6cbed3d33 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._ +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.ProcTimeType +import org.apache.flink.table.functions.RowTimeType +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( + sort: LogicalSort, + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + rowRelDataType: RelDataType, + inputType: RelDataType, + description: String) + extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel { + + override def deriveRowType(): RelDataType = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamSort( + sort, + cluster, + traitSet, + inputs.get(0), + rowRelDataType, + inputType, + description + sort.getId()) + } + + override def toString: String = { + s"Sort($sort)" + + " on fields: (${sort.collation.getFieldCollations})" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .item("aggregate", sort) + .item("sort fields",sort.collation.getFieldCollations) + .itemIf("offset", sort.offset, sort.offset!=null) + .itemIf("fetch", sort.fetch, sort.fetch!=null) + .item("input", inputNode) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + + val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + + //need to identify time between others order fields. Time needs to be first sort element + val timeType = SortUtil.getTimeType(sort,inputType) + + //time ordering needs to be ascending + if (SortUtil.getTimeDirection(sort) != Direction.ASCENDING) { + throw new TableException("SQL/Table supports only ascending time ordering") + } + + + val (offset,fetch) = (sort.offset,sort.fetch) + + //enable to extend for other types of aggregates that will not be implemented in a window + timeType match { + case _: ProcTimeType => + (offset,fetch) match { + case (o:Any,f:Any) => null // offset and fetch needs retraction + case (_,f:Any) => null // offset needs retraction + case (o:Any,_) => null // fetch needs retraction + case _ => createSortProcTime(inputDS) //sort can be done with/without retraction + } + case _: RowTimeType => + throw new TableException("SQL/Table does not support sort on row time") + case _ => + throw new TableException("SQL/Table needs to have sort on time as first sort element") + } + + } + + /** + * Create Sort logic based on processing time + */ + def createSortProcTime( + inputDS: DataStream[Row]): DataStream[Row] = { + + + // get the output types + //Sort does not do project.= Hence it will output also the ordering proctime field + //[TODO]Do we need to drop some of the ordering fields? (implement a projection logic? + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + //if the order has secondary sorting fields in addition to the proctime + if( SortUtil.getSortFieldIndexList(sort).size > 1) { + + val processFunction = SortUtil.createProcTimeSortFunction(sort,inputType) + + inputDS + .keyBy(new NullByteKeySelector[Row]) + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .asInstanceOf[DataStream[Row]] + } else { + //if the order is done only on proctime we only need to forward the elements + inputDS.keyBy(new NullByteKeySelector[Row]) + .map(new IdentityRowMap()) + .setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .asInstanceOf[DataStream[Row]] + } + } + + +} + + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 41f095f032afb..54e3eea7978ed 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -186,7 +186,9 @@ object FlinkRuleSets { // scan optimization PushProjectIntoStreamTableSourceScanRule.INSTANCE, - PushFilterIntoStreamTableSourceScanRule.INSTANCE + PushFilterIntoStreamTableSourceScanRule.INSTANCE, + + DataStreamSortRule.INSTANCE ) /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala new file mode 100644 index 0000000000000..3952725632c1d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule + extends ConverterRule( + classOf[LogicalSort], + Convention.NONE, + DataStreamConvention.INSTANCE, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + super.matches(call) + } + + override def convert(rel: RelNode): RelNode = { + val calc: LogicalSort = rel.asInstanceOf[LogicalSort] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calc.getInput(0), DataStreamConvention.INSTANCE) + + val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType + + new DataStreamSort( + calc, + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + inputRowType, + description + calc.getId()) + + } + +} + +object DataStreamSortRule { + val INSTANCE: RelOptRule = new DataStreamSortRule +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala new file mode 100644 index 0000000000000..832cf98bebd2e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.LinkedList +import java.util.Comparator + + +/** + * Process Function used for the aggregate in bounded proctime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param fieldCount Is used to indicate fields in the current element to forward + * @param inputType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class ProcTimeBoundedSortProcessFunction( + private val fieldCount: Int, + private val inputType: TypeInformation[Row], + private val rowComparator:Comparator[Row]) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(rowComparator) + + private var stateEventsBuffer: ListState[Row] = _ + + override def open(config: Configuration) { + val sortDescriptor = new ListStateDescriptor[Row]("sortState", inputType) + stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val currentTime = ctx.timerService.currentProcessingTime + //buffer the event incoming event + + //we accumulate the events as they arrive within the given proctime + stateEventsBuffer.add(input) + + //deduplication of multiple registered timers is done automatically + ctx.timerService.registerProcessingTimeTimer(currentTime + 1) + + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + + var i = 0 + val sortList = new LinkedList[Row] + val iter = stateEventsBuffer.get.iterator() + + + while(iter.hasNext()) + { + sortList.add(iter.next()) + } + + //if we do not rely on java collections to do the sort we could implement + //an insertion sort as we get the elements from the state + sortList.sort(rowComparator) + + //no retraction now + + //no selection of offset/fetch + + //we need to build the output and emit the events in order + var iElemenets = 0 + while (iElemenets < sortList.size) { + out.collect(sortList.get(iElemenets)) + iElemenets += 1 + } + + //we need to clear the events accumulated in the last millisecond + stateEventsBuffer.clear() + + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala new file mode 100644 index 0000000000000..11795450771a4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.flink.table.functions.Accumulator +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, TypeInformation } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ + +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on proctime and potentially other fields + * @param calcSort Sort logical object + * @param inputType input row type + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createProcTimeSortFunction( + calcSort: LogicalSort, + inputType: RelDataType): ProcessFunction[Row, Row] = { + + val keySortFields = getSortFieldIndexList(calcSort) + val keySortDirections = getSortFieldDirectionList(calcSort) + + val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] + + val orderings = createOrderingComparison(inputType, keySortFields, keySortDirections) + + //drop time from comparison + val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) + val orderingsNoTime = orderings.slice(1, keySortFields.size) + + val rowComparator = createRowSortComparator(keyIndexesNoTime,orderingsNoTime) + + new ProcTimeBoundedSortProcessFunction( + inputType.getFieldCount, + inputRowType, + rowComparator) + + } + + /** + * Function creates a row comparator for the sorting fields based on + * [java.util.Comparator] objects derived from [org.apache.flink.api.common.TypeInfo] + * @param keyIndex the indexes of the fields on which the sorting is done. + * First is expected to be the time + * @param orderings the [UntypedOrdering] objects + * @return Array of ordering objects + */ + def createRowSortComparator(keyIndex: Array[Int], + orderings:Array[UntypedOrdering]): Comparator[Row] = { + + new SortRowComparator(orderings,keyIndex) + } + + /** + * Function creates comparison objects with embeded type casting + * @param inputType input row type + * @param keyIndex the indexes of the fields on which the sorting is done. + * First is expected to be the time + * @return Array of ordering objects + */ + def createOrderingComparison(inputType: RelDataType, + keyIndex: Array[Int], + orderDirection: Array[Direction]): Array[UntypedOrdering] = { + + var i = 0 + val orderings = new Array[UntypedOrdering](keyIndex.size) + + while (i < keyIndex.size) { + val sqlTypeName = inputType.getFieldList.get(keyIndex(i)).getType.getSqlTypeName + + orderings(i) = sqlTypeName match { + case TINYINT => new ByteOrdering( + BYTE_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case SMALLINT => new SmallOrdering( + SHORT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case INTEGER => new IntOrdering( + INT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case BIGINT => new LongOrdering( + LONG_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case FLOAT => new FloatOrdering( + FLOAT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case DOUBLE => new DoubleOrdering( + DOUBLE_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case DECIMAL => new DecimalOrdering( + BIG_DEC_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case VARCHAR | CHAR => new StringOrdering( + STRING_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + //should be updated when times are merged in master branch based on their types + case TIMESTAMP => new TimestampOrdering( + LONG_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) + case sqlType: SqlTypeName => + throw new TableException("Sort aggregate does no support type:" + sqlType) + } + i += 1 + } + + orderings + } + + /** + * Function returns the array of indexes for the fields on which the sort is done + * @param calcSort The LogicalSort object + * @return [Array[Int]] + */ + def getSortFieldIndexList(calcSort: LogicalSort): Array[Int] = { + val keyFields = calcSort.collation.getFieldCollations + var i = 0 + val keySort = new Array[Int](keyFields.size()) + while (i < keyFields.size()) { + keySort(i) = keyFields.get(i).getFieldIndex + i += 1 + } + + keySort + } + + /** + * Function returns the array of sort direction for each of the sort fields + * @param calcSort The LogicalSort object + * @return [Array[Direction]] + */ + def getSortFieldDirectionList(calcSort: LogicalSort): Array[Direction] = { + val keyFields = calcSort.collation.getFieldCollations + var i = 0 + val keySortDirection = new Array[Direction](keyFields.size()) + while (i < keyFields.size()) { + keySortDirection(i) = getDirection(calcSort,i) + i += 1 + } + keySortDirection + } + + /** + * Function returns the direction type of the time in order clause. + * @param calcSort The LogicalSort object + * @return [Array[Int]] + */ + def getTimeDirection(calcSort: LogicalSort):Direction = { + calcSort.getCollationList.get(0).getFieldCollations.get(0).direction + } + + /** + * Function returns the time type in order clause. + * Expectation is that it is the primary sort field + * @param calcSort The LogicalSort object + * @param rowType The data type of the input + * @return [org.apache.calcite.rel.type.RelDataType] + */ + def getTimeType(calcSort: LogicalSort, rowType: RelDataType): RelDataType = { + + //need to identify time between others order fields + // + val ind = calcSort.getCollationList.get(0).getFieldCollations.get(0).getFieldIndex + rowType.getFieldList.get(ind).getValue + } + + /** + * Function returns the direction type of a field in order clause. + * @param calcSort The LogicalSort object + * @return [org.apache.calcite.rel.RelFieldCollation.Direction] + */ + def getDirection(calcSort: LogicalSort, sortField:Int):Direction = { + + calcSort.getCollationList.get(0).getFieldCollations.get(sortField).direction match { + case Direction.ASCENDING => Direction.ASCENDING + case Direction.DESCENDING => Direction.DESCENDING + case _ => throw new TableException("SQL/Table does not support such sorting") + } + + } + +} + + +/** + * Untyped interface for defining comparison method that can be override by typed implementations + * Each typed implementation will cast the generic type to the implicit ordering type used + */ + +trait UntypedOrdering extends Serializable{ + def compare(x: Any, y: Any): Int + +} + +class LongOrdering(private val ord: TypeComparator[JLong]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xL = x.asInstanceOf[JLong] + val yL = y.asInstanceOf[JLong] + ord.compare(xL, yL) + } +} + +class IntOrdering(private val ord: TypeComparator[JInt]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xI = x.asInstanceOf[JInt] + val yI = y.asInstanceOf[JInt] + ord.compare(xI, yI) + } +} + +class FloatOrdering(private val ord: TypeComparator[JFloat]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xF = x.asInstanceOf[JFloat] + val yF = y.asInstanceOf[JFloat] + ord.compare(xF, yF) + } +} + +class DoubleOrdering(private val ord: TypeComparator[JDouble]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xD = x.asInstanceOf[JDouble] + val yD = y.asInstanceOf[JDouble] + ord.compare(xD, yD) + } +} + +class DecimalOrdering(private val ord: TypeComparator[JBigDecimal]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xBD = x.asInstanceOf[JBigDecimal] + val yBD = y.asInstanceOf[JBigDecimal] + ord.compare(xBD, yBD) + } +} + +class ByteOrdering(private val ord: TypeComparator[JByte]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xB = x.asInstanceOf[JByte] + val yB = y.asInstanceOf[JByte] + ord.compare(xB, yB) + } +} + +class SmallOrdering(private val ord: TypeComparator[JShort]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xS = x.asInstanceOf[JShort] + val yS = y.asInstanceOf[JShort] + ord.compare(xS, yS) + } +} + +class StringOrdering(private val ord: TypeComparator[JString]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xS = x.asInstanceOf[JString] + val yS = y.asInstanceOf[JString] + ord.compare(xS, yS) + } +} + + +/** + * Ordering object for timestamps. As there is no implicit Ordering for [java.sql.Timestamp] + * we need to compare based on the Long value of the timestamp + */ +class TimestampOrdering(private val ord: TypeComparator[JLong]) extends UntypedOrdering { + + override def compare(x: Any, y: Any): Int = { + val xTs = x.asInstanceOf[Timestamp] + val yTs = y.asInstanceOf[Timestamp] + ord.compare(xTs.getTime, yTs.getTime) + } +} + + +/** + * Called every time when a sort operation is done. It applies the Comparator objects in cascade + * + * @param orderedComparators the sort Comparator objects with type casting + * @param keyIndexes the sort index fields on which to apply the comparison on the inputs + */ +class SortRowComparator( + private val orderedComparators:Array[UntypedOrdering], + private val keyIndexes:Array[Int]) extends Comparator[Row] with Serializable { + override def compare(arg0:Row, arg1:Row):Int = { + + var i = 0 + var result:Int = 0 + while (i i += 1 //same key and need to sort on consequent keys + case g => { result = g //in case the case does not return the result + i = keyIndexes.size // exit and return the result + } + } + } + result //all sort fields were equal, hence elements are equal + } + +} + + +/** + * Identity map for forwarding the fields based on their arriving times + */ +private[flink] class IdentityRowMap extends MapFunction[Row,Row] { + override def map(value:Row):Row ={ + value + } + } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 6f03becf526ad..36d5aad4d43de 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -29,6 +29,27 @@ class WindowAggregateTest extends TableTestBase { private val streamUtil: StreamTableTestUtil = streamTestUtil() streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + + + @Test + def testSortProcessingTime() = { + + val sqlQuery = "SELECT a FROM MyTable ORDER BY procTime(), c" + + val expected = + unaryNode( + "DataStreamSort", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "1970-01-01 00:00:00 AS EXPR$1","c") + ), + term("sort fields", """[1, 2]""") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + @Test def testNonPartitionedProcessingTimeBoundedWindow() = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 3610898e35fff..55e6770e4dff6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -35,6 +35,7 @@ import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunct import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._ import org.apache.flink.types.Row import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedSortProcessFunction class BoundedProcessingOverRangeProcessFunctionTest { @@ -280,6 +281,86 @@ class BoundedProcessingOverRangeProcessFunctionTest { testHarness.close() } + + + @Test + def testSortProcTimeHarnessPartitioned(): Unit = { + + val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + + val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) + val indexes = Array(1,2) + val orderings = Array[UntypedOrdering]( + new LongOrdering(LONG_TYPE_INFO.createComparator(true, null)), + new IntOrdering(INT_TYPE_INFO.createComparator(false, null)) ) + + + val rowComparator = new SortRowComparator(orderings,indexes) + + val processFunction = new KeyedProcessOperator[Integer,Row,Row]( + new ProcTimeBoundedSortProcessFunction( + 5, + rT, + rowComparator + )) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,Row,Row]( + processFunction, + new TupleRowSelector(0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open(); + + testHarness.setProcessingTime(3) + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 1001)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 2002)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 2003)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 2004)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 2006)) + + //move the timestamp to ensure the execution + testHarness.setProcessingTime(1005) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same proc timestamp have the same value + // elements should be sorted ascending on field 1 and descending on field 2 + // (10,0) (11,1) (12,2) (12,1) (12,0) + + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", + expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + + } + } object BoundedProcessingOverRangeProcessFunctionTest { From a826b17dfcd5a7ef2e6fa6fea4277595c457a39f Mon Sep 17 00:00:00 2001 From: rtudoran Date: Tue, 25 Apr 2017 19:34:12 +0200 Subject: [PATCH 2/3] Address PR remarks: - moved to RowComparator - removed type comparators wrappers - refactor tests - reuse sorting object --- .../nodes/datastream/DataStreamSort.scala | 90 +++-- .../rules/datastream/DataStreamSortRule.scala | 14 +- ...cala => ProcTimeSortProcessFunction.scala} | 19 +- .../table/runtime/aggregate/SortUtil.scala | 366 ++++++++---------- .../table/api/scala/stream/sql/SortTest.scala | 77 ++++ .../stream/sql/WindowAggregateTest.scala | 20 - ...ocessingOverRangeProcessFunctionTest.scala | 80 ---- .../ProcTimeSortProcessFunctionTest.scala | 163 ++++++++ 8 files changed, 470 insertions(+), 359 deletions(-) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/{ProcTimeBoundedSortProcessFunction.scala => ProcTimeSortProcessFunction.scala} (89%) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunctionTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala index 46cc6cbed3d33..b8632f5159551 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } @@ -39,7 +38,6 @@ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } import org.apache.flink.types.Row -import org.apache.calcite.rel.logical.LogicalSort import org.apache.calcite.sql.SqlAggFunction import org.apache.flink.table.plan.nodes.datastream.DataStreamRel import org.apache.flink.table.api.TableException @@ -57,18 +55,22 @@ import org.apache.calcite.rel.core.Sort import org.apache.flink.api.java.functions.NullByteKeySelector import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig /** * Flink RelNode which matches along with Sort Rule. * - */ + */ class DataStreamSort( - sort: LogicalSort, + sortCollation: RelCollation, + sortOffset: RexNode, + sortFetch: RexNode, cluster: RelOptCluster, traitSet: RelTraitSet, inputNode: RelNode, rowRelDataType: RelDataType, - inputType: RelDataType, description: String) extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel { @@ -76,27 +78,31 @@ class DataStreamSort( override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamSort( - sort, + sortCollation, + sortOffset, + sortFetch, cluster, traitSet, inputs.get(0), rowRelDataType, - inputType, - description + sort.getId()) + description) } override def toString: String = { - s"Sort($sort)" + - " on fields: (${sort.collation.getFieldCollations})" + s"Sort(by: ($SortUtil.getSortFieldToString(sortCollation, rowRelDataType))," + + " offset: $SortUtil.getOffsetToString(sortOffset)," + + " fetch: $SortUtil.getFetchToString(sortFetch, sortOffset))" } - - override def explainTerms(pw: RelWriter): RelWriter = { + + override def explainTerms(pw: RelWriter) : RelWriter = { + + //need to identify time between others order fields. Time needs to be first sort element + checkTimeOrder() + super.explainTerms(pw) - .item("aggregate", sort) - .item("sort fields",sort.collation.getFieldCollations) - .itemIf("offset", sort.offset, sort.offset!=null) - .itemIf("fetch", sort.fetch, sort.fetch!=null) - .item("input", inputNode) + .item("orderBy", SortUtil.getSortFieldToString(sortCollation, rowRelDataType)) + .item("offset", SortUtil.getOffsetToString(sortOffset)) + .item("fetch", SortUtil.getFetchToString(sortFetch, sortOffset)) } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { @@ -104,24 +110,26 @@ class DataStreamSort( val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) //need to identify time between others order fields. Time needs to be first sort element - val timeType = SortUtil.getTimeType(sort,inputType) + val timeType = SortUtil.getTimeType(sortCollation, rowRelDataType) //time ordering needs to be ascending - if (SortUtil.getTimeDirection(sort) != Direction.ASCENDING) { + if (SortUtil.getTimeDirection(sortCollation) != Direction.ASCENDING) { throw new TableException("SQL/Table supports only ascending time ordering") } - - - val (offset,fetch) = (sort.offset,sort.fetch) + val execCfg = tableEnv.execEnv.getConfig + //enable to extend for other types of aggregates that will not be implemented in a window timeType match { case _: ProcTimeType => - (offset,fetch) match { - case (o:Any,f:Any) => null // offset and fetch needs retraction - case (_,f:Any) => null // offset needs retraction - case (o:Any,_) => null // fetch needs retraction - case _ => createSortProcTime(inputDS) //sort can be done with/without retraction + (sortOffset,sortFetch) match { + case (o: Any, f: Any) => // offset and fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset and fetch") + case (_, f: Any) => // offset needs retraction + throw new TableException("SQL/Table does not support sort with fetch") + case (o: Any, _) => // fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset") + case _ => createSortProcTime(inputDS, execCfg) //sort can be done without retraction } case _: RowTimeType => throw new TableException("SQL/Table does not support sort on row time") @@ -135,18 +143,16 @@ class DataStreamSort( * Create Sort logic based on processing time */ def createSortProcTime( - inputDS: DataStream[Row]): DataStream[Row] = { + inputDS: DataStream[Row], + execCfg: ExecutionConfig): DataStream[Row] = { - - // get the output types - //Sort does not do project.= Hence it will output also the ordering proctime field - //[TODO]Do we need to drop some of the ordering fields? (implement a projection logic? val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] //if the order has secondary sorting fields in addition to the proctime - if( SortUtil.getSortFieldIndexList(sort).size > 1) { + if( SortUtil.getSortFieldIndexList(sortCollation).size > 1) { - val processFunction = SortUtil.createProcTimeSortFunction(sort,inputType) + val processFunction = SortUtil.createProcTimeSortFunction(sortCollation, + rowRelDataType, execCfg) inputDS .keyBy(new NullByteKeySelector[Row]) @@ -163,6 +169,22 @@ class DataStreamSort( } } + def checkTimeOrder() = { + //need to identify time between others order fields. Time needs to be first sort element + val timeType = SortUtil.getTimeType(sortCollation, rowRelDataType) + //time ordering needs to be ascending + if (SortUtil.getTimeDirection(sortCollation) != Direction.ASCENDING) { + throw new TableException("SQL/Table supports only ascending time ordering") + } + //enable to extend for other types of aggregates that will not be implemented in a window + timeType match { + case _: ProcTimeType => + case _: RowTimeType => + case _ => + throw new TableException("SQL/Table needs to have sort on time as first sort element") + + } + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala index 3952725632c1d..48caec6632b40 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate import org.apache.calcite.rel.logical.LogicalSort import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation /** * Rule to convert a LogicalSort into a DataStreamSort. @@ -43,20 +44,19 @@ class DataStreamSortRule } override def convert(rel: RelNode): RelNode = { - val calc: LogicalSort = rel.asInstanceOf[LogicalSort] + val calcSort: LogicalSort = rel.asInstanceOf[LogicalSort] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput(0), DataStreamConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calcSort.getInput(0), DataStreamConvention.INSTANCE) - val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType - new DataStreamSort( - calc, + calcSort.collation, + calcSort.offset, + calcSort.fetch, rel.getCluster, traitSet, convInput, rel.getRowType, - inputRowType, - description + calc.getId()) + description) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala similarity index 89% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala index 832cf98bebd2e..65501c27cf8cf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala @@ -33,9 +33,10 @@ import org.apache.flink.api.common.state.MapState import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ListTypeInfo -import java.util.LinkedList import java.util.Comparator - +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator /** * Process Function used for the aggregate in bounded proctime sort without offset/fetch @@ -45,15 +46,16 @@ import java.util.Comparator * @param inputType It is used to mark the type of the incoming data * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation */ -class ProcTimeBoundedSortProcessFunction( +class ProcTimeSortProcessFunction( private val fieldCount: Int, private val inputType: TypeInformation[Row], - private val rowComparator:Comparator[Row]) + private val rowComparator: CollectionRowComparator) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(rowComparator) private var stateEventsBuffer: ListState[Row] = _ + private val sortArray: ArrayList[Row] = new ArrayList[Row] override def open(config: Configuration) { val sortDescriptor = new ListStateDescriptor[Row]("sortState", inputType) @@ -82,18 +84,17 @@ class ProcTimeBoundedSortProcessFunction( out: Collector[Row]): Unit = { var i = 0 - val sortList = new LinkedList[Row] val iter = stateEventsBuffer.get.iterator() while(iter.hasNext()) { - sortList.add(iter.next()) + sortArray.add(iter.next()) } //if we do not rely on java collections to do the sort we could implement //an insertion sort as we get the elements from the state - sortList.sort(rowComparator) + Collections.sort(sortArray, rowComparator) //no retraction now @@ -101,8 +102,8 @@ class ProcTimeBoundedSortProcessFunction( //we need to build the output and emit the events in order var iElemenets = 0 - while (iElemenets < sortList.size) { - out.collect(sortList.get(iElemenets)) + while (iElemenets < sortArray.size) { + out.collect(sortArray.get(iElemenets)) iElemenets += 1 } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala index 11795450771a4..f6ca6f4c551b9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala @@ -18,10 +18,9 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.types.Row import org.apache.calcite.rel.`type`._ -import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.RelCollation import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.functions.AggregateFunction import org.apache.calcite.sql.`type`.SqlTypeName @@ -41,6 +40,14 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} import java.math.{BigDecimal=>JBigDecimal} import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer + +import scala.collection.JavaConverters._ /** * Class represents a collection of helper methods to build the sort logic. @@ -52,159 +59,164 @@ object SortUtil { /** * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting * elements based on proctime and potentially other fields - * @param calcSort Sort logical object + * @param collationSort The Sort collation list * @param inputType input row type + * @param execCfg table environment execution configuration * @return org.apache.flink.streaming.api.functions.ProcessFunction */ private[flink] def createProcTimeSortFunction( - calcSort: LogicalSort, - inputType: RelDataType): ProcessFunction[Row, Row] = { + collationSort: RelCollation, + inputType: RelDataType, + execCfg: ExecutionConfig): ProcessFunction[Row, Row] = { - val keySortFields = getSortFieldIndexList(calcSort) - val keySortDirections = getSortFieldDirectionList(calcSort) + val keySortFields = getSortFieldIndexList(collationSort) + val keySortDirections = getSortFieldDirectionList(collationSort) val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] - val orderings = createOrderingComparison(inputType, keySortFields, keySortDirections) - - //drop time from comparison + //drop time from comparison val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) - val orderingsNoTime = orderings.slice(1, keySortFields.size) + val keyDirectionsNoTime = keySortDirections.slice(1, keySortDirections.size) + val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) + val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) - val rowComparator = createRowSortComparator(keyIndexesNoTime,orderingsNoTime) + val fieldComps = createFieldComparators(inputType, keyIndexesNoTime, keyDirectionsNoTime, execCfg) + val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] - new ProcTimeBoundedSortProcessFunction( + val rowComp = createRowComparator(inputType, keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime) + val collectionRowComparator = new CollectionRowComparator(rowComp) + + + new ProcTimeSortProcessFunction( inputType.getFieldCount, inputRowType, - rowComparator) + collectionRowComparator) } - /** - * Function creates a row comparator for the sorting fields based on - * [java.util.Comparator] objects derived from [org.apache.flink.api.common.TypeInfo] - * @param keyIndex the indexes of the fields on which the sorting is done. - * First is expected to be the time - * @param orderings the [UntypedOrdering] objects - * @return Array of ordering objects - */ - def createRowSortComparator(keyIndex: Array[Int], - orderings:Array[UntypedOrdering]): Comparator[Row] = { - - new SortRowComparator(orderings,keyIndex) - } /** - * Function creates comparison objects with embeded type casting + * Function creates comparison objects based on the field types * @param inputType input row type * @param keyIndex the indexes of the fields on which the sorting is done. - * First is expected to be the time - * @return Array of ordering objects + * @param orderDirection the directions of each sort field. + * @param execConfig the configuration environment + * @return Array of TypeComparator objects */ - def createOrderingComparison(inputType: RelDataType, - keyIndex: Array[Int], - orderDirection: Array[Direction]): Array[UntypedOrdering] = { - - var i = 0 - val orderings = new Array[UntypedOrdering](keyIndex.size) - - while (i < keyIndex.size) { - val sqlTypeName = inputType.getFieldList.get(keyIndex(i)).getType.getSqlTypeName - - orderings(i) = sqlTypeName match { - case TINYINT => new ByteOrdering( - BYTE_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case SMALLINT => new SmallOrdering( - SHORT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case INTEGER => new IntOrdering( - INT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case BIGINT => new LongOrdering( - LONG_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case FLOAT => new FloatOrdering( - FLOAT_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case DOUBLE => new DoubleOrdering( - DOUBLE_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case DECIMAL => new DecimalOrdering( - BIG_DEC_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case VARCHAR | CHAR => new StringOrdering( - STRING_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - //should be updated when times are merged in master branch based on their types - case TIMESTAMP => new TimestampOrdering( - LONG_TYPE_INFO.createComparator(orderDirection(i)==Direction.ASCENDING, null)) - case sqlType: SqlTypeName => - throw new TableException("Sort aggregate does no support type:" + sqlType) + def createFieldComparators( + inputType: RelDataType, + keyIndex: Array[Int], + orderDirection: Array[Direction], + execConfig: ExecutionConfig): Array[TypeComparator[_]] = { + + var iOrder = 0 + for (i <- keyIndex) yield { + + val order = if (orderDirection(iOrder) == Direction.ASCENDING) true else false + iOrder += 1 + val fieldTypeInfo = FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(i).getType) + fieldTypeInfo match { + case a: AtomicType[_] => a.createComparator(order, execConfig) + case _ => throw new TableException(s"Unsupported field type $fieldTypeInfo to sort on.") } - i += 1 } - - orderings } + + /** + * Function creates a RowComparator based on the typed comparators + * @param inputRowType input row type + * @param fieldIdxs the indexes of the fields on which the sorting is done. + * @param fieldComps the array of typed comparators + * @param fieldOrders the directions of each sort field (true = ASC). + * @return A Row TypeComparator object + */ + def createRowComparator( + inputRowType: RelDataType, + fieldIdxs: Array[Int], + fieldComps: Array[TypeComparator[AnyRef]], + fieldOrders: Array[Boolean]): TypeComparator[Row] = { + + val rowComp = new RowComparator( + inputRowType.getFieldCount, + fieldIdxs, + fieldComps, + new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons + fieldOrders) + + rowComp +} + /** * Function returns the array of indexes for the fields on which the sort is done - * @param calcSort The LogicalSort object + * @param collationSort The Sort collation list * @return [Array[Int]] */ - def getSortFieldIndexList(calcSort: LogicalSort): Array[Int] = { - val keyFields = calcSort.collation.getFieldCollations - var i = 0 - val keySort = new Array[Int](keyFields.size()) - while (i < keyFields.size()) { - keySort(i) = keyFields.get(i).getFieldIndex - i += 1 - } - - keySort + def getSortFieldIndexList(collationSort: RelCollation): Array[Int] = { + val keyFields = collationSort.getFieldCollations.toArray() + //val keyFields = collationSort.getFieldCollations.toArray().asInstanceOf[Array[RelFieldCollation]] + for (f <- keyFields) yield f.asInstanceOf[RelFieldCollation].getFieldIndex } /** * Function returns the array of sort direction for each of the sort fields - * @param calcSort The LogicalSort object + * @param collationSort The Sort collation list + * @return [Array[Direction]] + */ + def getSortFieldDirectionList(collationSort: RelCollation): Array[Direction] = { + val keyFields = collationSort.getFieldCollations.toArray() + for(f <- keyFields) yield f.asInstanceOf[RelFieldCollation].getDirection + } + + /** + * Function returns the array of sort direction for each of the sort fields + * @param collationSort The Sort collation list * @return [Array[Direction]] */ - def getSortFieldDirectionList(calcSort: LogicalSort): Array[Direction] = { - val keyFields = calcSort.collation.getFieldCollations + def getSortFieldDirectionBooleanList(collationSort: RelCollation): Array[Boolean] = { + val keyFields = collationSort.getFieldCollations var i = 0 - val keySortDirection = new Array[Direction](keyFields.size()) + val keySortDirection = new Array[Boolean](keyFields.size()) while (i < keyFields.size()) { - keySortDirection(i) = getDirection(calcSort,i) + keySortDirection(i) = + if (getDirection(collationSort,i) == Direction.ASCENDING) true else false i += 1 } keySortDirection } - + + /** * Function returns the direction type of the time in order clause. - * @param calcSort The LogicalSort object - * @return [Array[Int]] + * @param collationSort The Sort collation list of objects + * @return [org.apache.calcite.rel.RelFieldCollation.Direction] */ - def getTimeDirection(calcSort: LogicalSort):Direction = { - calcSort.getCollationList.get(0).getFieldCollations.get(0).direction + def getTimeDirection(collationSort: RelCollation): Direction = { + collationSort.getFieldCollations.get(0).direction } /** * Function returns the time type in order clause. * Expectation is that it is the primary sort field - * @param calcSort The LogicalSort object + * @param collationSort The Sort collation list * @param rowType The data type of the input * @return [org.apache.calcite.rel.type.RelDataType] */ - def getTimeType(calcSort: LogicalSort, rowType: RelDataType): RelDataType = { + def getTimeType(collationSort: RelCollation, rowType: RelDataType): RelDataType = { - //need to identify time between others order fields - // - val ind = calcSort.getCollationList.get(0).getFieldCollations.get(0).getFieldIndex + //need to identify time between others ordering fields + val ind = collationSort.getFieldCollations.get(0).getFieldIndex rowType.getFieldList.get(ind).getValue } /** * Function returns the direction type of a field in order clause. - * @param calcSort The LogicalSort object + * @param collationSort The Sort collation list * @return [org.apache.calcite.rel.RelFieldCollation.Direction] */ - def getDirection(calcSort: LogicalSort, sortField:Int):Direction = { + def getDirection(collationSort: RelCollation, sortField:Int): Direction = { - calcSort.getCollationList.get(0).getFieldCollations.get(sortField).direction match { + collationSort.getFieldCollations.get(sortField).direction match { case Direction.ASCENDING => Direction.ASCENDING case Direction.DESCENDING => Direction.DESCENDING case _ => throw new TableException("SQL/Table does not support such sorting") @@ -212,137 +224,73 @@ object SortUtil { } -} - - -/** - * Untyped interface for defining comparison method that can be override by typed implementations - * Each typed implementation will cast the generic type to the implicit ordering type used - */ - -trait UntypedOrdering extends Serializable{ - def compare(x: Any, y: Any): Int - -} - -class LongOrdering(private val ord: TypeComparator[JLong]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xL = x.asInstanceOf[JLong] - val yL = y.asInstanceOf[JLong] - ord.compare(xL, yL) - } -} - -class IntOrdering(private val ord: TypeComparator[JInt]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xI = x.asInstanceOf[JInt] - val yI = y.asInstanceOf[JInt] - ord.compare(xI, yI) - } -} - -class FloatOrdering(private val ord: TypeComparator[JFloat]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xF = x.asInstanceOf[JFloat] - val yF = y.asInstanceOf[JFloat] - ord.compare(xF, yF) - } -} - -class DoubleOrdering(private val ord: TypeComparator[JDouble]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xD = x.asInstanceOf[JDouble] - val yD = y.asInstanceOf[JDouble] - ord.compare(xD, yD) + def directionToOrder(direction: Direction) = { + direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") + } } -} - -class DecimalOrdering(private val ord: TypeComparator[JBigDecimal]) extends UntypedOrdering { + + def getSortFieldToString(collationSort: RelCollation, rowRelDataType: RelDataType): String = { + val fieldCollations = collationSort.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) - override def compare(x: Any, y: Any): Int = { - val xBD = x.asInstanceOf[JBigDecimal] - val yBD = y.asInstanceOf[JBigDecimal] - ord.compare(xBD, yBD) + val sortFieldsToString = fieldCollations + .map(col => s"${ + rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + + sortFieldsToString } -} - -class ByteOrdering(private val ord: TypeComparator[JByte]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xB = x.asInstanceOf[JByte] - val yB = y.asInstanceOf[JByte] - ord.compare(xB, yB) + + def getOffsetToString(offset: RexNode): String = { + val offsetToString = s"$offset" + offsetToString } -} - -class SmallOrdering(private val ord: TypeComparator[JShort]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xS = x.asInstanceOf[JShort] - val yS = y.asInstanceOf[JShort] - ord.compare(xS, yS) + + def getFetchToString(fetch: RexNode, offset: RexNode): String = { + val limitEnd = getFetchLimitEnd(fetch, offset) + val fetchToString = if (limitEnd == Long.MaxValue) { + "unlimited" + } else { + s"$limitEnd" + } + fetchToString } -} - -class StringOrdering(private val ord: TypeComparator[JString]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xS = x.asInstanceOf[JString] - val yS = y.asInstanceOf[JString] - ord.compare(xS, yS) + + def getFetchLimitEnd (fetch: RexNode, offset: RexNode): Long = { + val limitEnd: Long = if (fetch != null) { + RexLiteral.intValue(fetch) + getFetchLimitStart(fetch, offset) + } else { + Long.MaxValue + } + limitEnd } -} - - -/** - * Ordering object for timestamps. As there is no implicit Ordering for [java.sql.Timestamp] - * we need to compare based on the Long value of the timestamp - */ -class TimestampOrdering(private val ord: TypeComparator[JLong]) extends UntypedOrdering { - - override def compare(x: Any, y: Any): Int = { - val xTs = x.asInstanceOf[Timestamp] - val yTs = y.asInstanceOf[Timestamp] - ord.compare(xTs.getTime, yTs.getTime) + + def getFetchLimitStart (fetch: RexNode, offset: RexNode): Long = { + val limitStart: Long = if (offset != null) { + RexLiteral.intValue(offset) + } else { + 0L + } + limitStart } + } - /** - * Called every time when a sort operation is done. It applies the Comparator objects in cascade - * - * @param orderedComparators the sort Comparator objects with type casting - * @param keyIndexes the sort index fields on which to apply the comparison on the inputs + * Wrapper for Row TypeComparator to a Java Comparator object */ -class SortRowComparator( - private val orderedComparators:Array[UntypedOrdering], - private val keyIndexes:Array[Int]) extends Comparator[Row] with Serializable { +class CollectionRowComparator( + private val rowComp: TypeComparator[Row]) extends Comparator[Row] with Serializable { + override def compare(arg0:Row, arg1:Row):Int = { - - var i = 0 - var result:Int = 0 - while (i i += 1 //same key and need to sort on consequent keys - case g => { result = g //in case the case does not return the result - i = keyIndexes.size // exit and return the result - } - } - } - result //all sort fields were equal, hence elements are equal + rowComp.compare(arg0, arg1) } - } + /** * Identity map for forwarding the fields based on their arriving times */ diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala new file mode 100644 index 0000000000000..2be4eaab7ce36 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class SortTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + + + + @Test + def testSortProcessingTime() = { + + val sqlQuery = "SELECT a FROM MyTable ORDER BY procTime(), c" + + val expected = + unaryNode( + "DataStreamSort", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "1970-01-01 00:00:00 AS EXPR$1","c") + ), + term("orderBy", "EXPR$1 ASC, c ASC], offset=[null], fetch=[unlimited") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + + @Test + def testSortProcessingTimeDesc() = { + + val sqlQuery = "SELECT a FROM MyTable ORDER BY procTime() DESC, c" + //fail if no error is thrown + try{ + streamUtil.verifySql(sqlQuery, "") + } catch { + case rt : Throwable => assert(true) + } + } + + @Test + def testSortProcessingTimeSecondaryField() = { + + val sqlQuery = "SELECT a FROM MyTable ORDER BY c, procTime()" + //fail if no error is thrown + try{ + streamUtil.verifySql(sqlQuery, "") + } catch { + case rt : Throwable => assert(true) + } + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 36d5aad4d43de..8dc7524c8fca5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -30,26 +30,6 @@ class WindowAggregateTest extends TableTestBase { streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) - - @Test - def testSortProcessingTime() = { - - val sqlQuery = "SELECT a FROM MyTable ORDER BY procTime(), c" - - val expected = - unaryNode( - "DataStreamSort", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "1970-01-01 00:00:00 AS EXPR$1","c") - ), - term("sort fields", """[1, 2]""") - ) - - streamUtil.verifySql(sqlQuery, expected) - } - @Test def testNonPartitionedProcessingTimeBoundedWindow() = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 55e6770e4dff6..08177ff967aaa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -35,7 +35,6 @@ import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunct import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._ import org.apache.flink.types.Row import org.junit.Test -import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedSortProcessFunction class BoundedProcessingOverRangeProcessFunctionTest { @@ -282,85 +281,6 @@ class BoundedProcessingOverRangeProcessFunctionTest { } - - @Test - def testSortProcTimeHarnessPartitioned(): Unit = { - - val rT = new RowTypeInfo(Array[TypeInformation[_]]( - INT_TYPE_INFO, - LONG_TYPE_INFO, - INT_TYPE_INFO, - STRING_TYPE_INFO, - LONG_TYPE_INFO), - Array("a","b","c","d","e")) - - val rTA = new RowTypeInfo(Array[TypeInformation[_]]( - LONG_TYPE_INFO), Array("count")) - val indexes = Array(1,2) - val orderings = Array[UntypedOrdering]( - new LongOrdering(LONG_TYPE_INFO.createComparator(true, null)), - new IntOrdering(INT_TYPE_INFO.createComparator(false, null)) ) - - - val rowComparator = new SortRowComparator(orderings,indexes) - - val processFunction = new KeyedProcessOperator[Integer,Row,Row]( - new ProcTimeBoundedSortProcessFunction( - 5, - rT, - rowComparator - )) - - val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,Row,Row]( - processFunction, - new TupleRowSelector(0), - BasicTypeInfo.INT_TYPE_INFO) - - testHarness.open(); - - testHarness.setProcessingTime(3) - - // timestamp is ignored in processing time - testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 1001)) - testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 2002)) - testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 2003)) - testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 2004)) - testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 2006)) - - //move the timestamp to ensure the execution - testHarness.setProcessingTime(1005) - - val result = testHarness.getOutput - - val expectedOutput = new ConcurrentLinkedQueue[Object]() - - // all elements at the same proc timestamp have the same value - // elements should be sorted ascending on field 1 and descending on field 2 - // (10,0) (11,1) (12,2) (12,1) (12,0) - - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", - expectedOutput, result, new RowResultSortComparator(6)) - - testHarness.close() - - } - } object BoundedProcessingOverRangeProcessFunctionTest { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunctionTest.scala new file mode 100644 index 0000000000000..492229be20d02 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunctionTest.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunctionTest._ +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.TypeComparator + +class ProcTimeSortProcessFunctionTest{ + + + @Test + def testSortProcTimeHarnessPartitioned(): Unit = { + + val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + + val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) + val indexes = Array(1,2) + + val fieldComps = Array[TypeComparator[AnyRef]]( + LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]], + INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] ) + val booleanOrders = Array(true, false) + + + val rowComp = new RowComparator( + rT.getTotalFields, + indexes, + fieldComps, + new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons + booleanOrders) + + val collectionRowComparator = new CollectionRowComparator(rowComp) + + + val processFunction = new KeyedProcessOperator[Integer,Row,Row]( + new ProcTimeSortProcessFunction( + 5, + rT, + collectionRowComparator)) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,Row,Row]( + processFunction, + new TupleRowSelector(0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open(); + + testHarness.setProcessingTime(3) + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 1001)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 2002)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 2003)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 2004)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 2006)) + + //move the timestamp to ensure the execution + testHarness.setProcessingTime(1005) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same proc timestamp have the same value + // elements should be sorted ascending on field 1 and descending on field 2 + // (10,0) (11,1) (12,2) (12,1) (12,0) + + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), 4)) + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", + expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + + } + +} + +object ProcTimeSortProcessFunctionTest{ + +/** + * Return 0 for equal Rows and non zero for different rows + */ +class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { + + override def compare(o1: Object, o2: Object):Int = { + + if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) { + // watermark is not expected + -1 + } else { + val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue + val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue + row1.toString.compareTo(row2.toString) + } + } +} + +/** + * Simple test class that returns a specified field as the selector function + */ +class TupleRowSelector( + private val selectorField:Int) extends KeySelector[Row, Integer] { + + override def getKey(value: Row): Integer = { + value.getField(selectorField).asInstanceOf[Integer] + } +} + +} From 77df4db5588f760a208a7ecff5e2a3fa03e858a8 Mon Sep 17 00:00:00 2001 From: rtudoran Date: Thu, 27 Apr 2017 22:38:38 +0200 Subject: [PATCH 3/3] Added rowtime ordeby support and corersponding tests --- .../nodes/datastream/DataStreamSort.scala | 34 +++- .../ProcTimeSortProcessFunction.scala | 5 +- .../RowTimeSortProcessFunction.scala | 158 ++++++++++++++++++ .../table/runtime/aggregate/SortUtil.scala | 49 +++++- .../table/api/scala/stream/sql/SortTest.scala | 19 +++ .../api/scala/stream/sql/SqlITCase.scala | 72 ++++++++ 6 files changed, 330 insertions(+), 7 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala index b8632f5159551..44b4407cbf509 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala @@ -132,7 +132,15 @@ class DataStreamSort( case _ => createSortProcTime(inputDS, execCfg) //sort can be done without retraction } case _: RowTimeType => - throw new TableException("SQL/Table does not support sort on row time") + (sortOffset,sortFetch) match { + case (o: Any, f: Any) => // offset and fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset and fetch") + case (_, f: Any) => // offset needs retraction + throw new TableException("SQL/Table does not support sort with fetch") + case (o: Any, _) => // fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset") + case _ => createSortRowTime(inputDS, execCfg) //sort can be done without retraction + } case _ => throw new TableException("SQL/Table needs to have sort on time as first sort element") } @@ -169,6 +177,30 @@ class DataStreamSort( } } + /** + * Create Sort logic based on row time + */ + def createSortRowTime( + inputDS: DataStream[Row], + execCfg: ExecutionConfig): DataStream[Row] = { + + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + val processFunction = SortUtil.createRowTimeSortFunction(sortCollation, + rowRelDataType, execCfg) + + inputDS + .keyBy(new NullByteKeySelector[Row]) + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .asInstanceOf[DataStream[Row]] + + } + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + def checkTimeOrder() = { //need to identify time between others order fields. Time needs to be first sort element val timeType = SortUtil.getTimeType(sortCollation, rowRelDataType) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala index 65501c27cf8cf..1e53705b234ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala @@ -86,9 +86,8 @@ class ProcTimeSortProcessFunction( var i = 0 val iter = stateEventsBuffer.get.iterator() - - while(iter.hasNext()) - { + sortArray.clear() + while(iter.hasNext()) { sortArray.add(iter.next()) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala new file mode 100644 index 0000000000000..9366893bb226c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param fieldCount Is used to indicate fields in the current element to forward + * @param inputType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val fieldCount: Int, + private val inputRowType: RowTypeInfo, + private val rowComparator: CollectionRowComparator) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(rowComparator) + + private val sortArray: ArrayList[Row] = new ArrayList[Row] + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + + // the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + + override def open(config: Configuration) { + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + } + + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + + // gets all window data from state for the calculation + val inputs: JList[Row] = dataState.get(timestamp) + + if (null != inputs) { + + var dataListIndex = 0 + + // no retraction needed for time order sort + + //no selection of offset/fetch + + dataListIndex = 0 + sortArray.clear() + while (dataListIndex < inputs.size()) { + val curRow = inputs.get(dataListIndex) + sortArray.add(curRow) + dataListIndex += 1 + } + + //if we do not rely on java collections to do the sort we could implement + //an insertion sort as we get the elements from the state + Collections.sort(sortArray, rowComparator) + + + //we need to build the output and emit the events in order + dataListIndex = 0 + while (dataListIndex < sortArray.size) { + out.collect(sortArray.get(dataListIndex)) + dataListIndex += 1 + } + + //we need to clear the events processed + dataState.remove(timestamp) + lastTriggeringTsState.update(timestamp) + + } + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala index f6ca6f4c551b9..33ce4b09ca8ad 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala @@ -56,6 +56,48 @@ import scala.collection.JavaConverters._ object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( + collationSort: RelCollation, + inputType: RelDataType, + execCfg: ExecutionConfig): ProcessFunction[Row, Row] = { + + val keySortFields = getSortFieldIndexList(collationSort) + val keySortDirections = getSortFieldDirectionList(collationSort) + + val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] + + //drop time from comparison as we sort on time in the states and result emission + val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) + val keyDirectionsNoTime = keySortDirections.slice(1, keySortDirections.size) + val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) + val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + + val fieldComps = createFieldComparators(inputType, + keyIndexesNoTime, keyDirectionsNoTime, execCfg) + val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + + val rowComp = createRowComparator(inputType, + keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime) + val collectionRowComparator = new CollectionRowComparator(rowComp) + + + new RowTimeSortProcessFunction( + inputType.getFieldCount, + inputRowType, + collectionRowComparator) + + } + + /** * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting * elements based on proctime and potentially other fields @@ -80,10 +122,12 @@ object SortUtil { val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) - val fieldComps = createFieldComparators(inputType, keyIndexesNoTime, keyDirectionsNoTime, execCfg) + val fieldComps = createFieldComparators(inputType, + keyIndexesNoTime, keyDirectionsNoTime, execCfg) val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] - val rowComp = createRowComparator(inputType, keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime) + val rowComp = createRowComparator(inputType, + keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime) val collectionRowComparator = new CollectionRowComparator(rowComp) @@ -154,7 +198,6 @@ object SortUtil { */ def getSortFieldIndexList(collationSort: RelCollation): Array[Int] = { val keyFields = collationSort.getFieldCollations.toArray() - //val keyFields = collationSort.getFieldCollations.toArray().asInstanceOf[Array[RelFieldCollation]] for (f <- keyFields) yield f.asInstanceOf[RelFieldCollation].getFieldIndex } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala index 2be4eaab7ce36..a3b8fcdccc407 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala @@ -50,6 +50,25 @@ class SortTest extends TableTestBase { } + @Test + def testSortRowTime() = { + + val sqlQuery = "SELECT a FROM MyTable ORDER BY rowTime(), c" + + val expected = + unaryNode( + "DataStreamSort", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "1970-01-01 00:00:00 AS EXPR$1","c") + ), + term("orderBy", "EXPR$1 ASC, c ASC], offset=[null], fetch=[unlimited") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + @Test def testSortProcessingTimeDesc() = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 67d13b0f455a5..435f20fcaac18 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -1140,6 +1140,78 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + + + @Test + def testEventTimeOrderBy(): Unit = { + val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 65, "Hello"))), + Left((6000L, (6L, 6, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((8500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 7, "Hello"))), + Right(10000L), + Left((10000L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + //env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env + .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT b FROM T1 " + + "ORDER BY RowTime(), b ASC "; + + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink).setParallelism(1) + env.execute() + + val expected = mutable.MutableList( + "1,1970-01-01 00:00:00.0", "15,1970-01-01 00:00:00.0", "16,1970-01-01 00:00:00.0", + "2,1970-01-01 00:00:00.0", "2,1970-01-01 00:00:00.0", "3,1970-01-01 00:00:00.0", + "3,1970-01-01 00:00:00.0", + "4,1970-01-01 00:00:00.0", + "5,1970-01-01 00:00:00.0", + "6,1970-01-01 00:00:00.0", "65,1970-01-01 00:00:00.0", + "18,1970-01-01 00:00:00.0", "7,1970-01-01 00:00:00.0", "9,1970-01-01 00:00:00.0", + "7,1970-01-01 00:00:00.0", "17,1970-01-01 00:00:00.0", "77,1970-01-01 00:00:00.0", + "18,1970-01-01 00:00:00.0", + "8,1970-01-01 00:00:00.0", + "20,1970-01-01 00:00:00.0") + assertEquals(expected, StreamITCase.testResults) + } + } object SqlITCase {