Skip to content

Commit

Permalink
[FLINK-6200] [table] Add support for unbounded event-time OVER RANGE …
Browse files Browse the repository at this point in the history
…window.

This closes #3649.
  • Loading branch information
hongyuhong authored and fhueske committed Mar 30, 2017
1 parent aa3c395 commit 44f9c76
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 49 deletions.
Expand Up @@ -127,14 +127,8 @@ class DataStreamOverAggregate(
// row-time OVER window
if (overWindow.lowerBound.isPreceding &&
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
if (overWindow.isRows) {
// unbounded preceding OVER ROWS window
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
} else {
// unbounded preceding OVER RANGE window
throw new TableException(
"row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.")
}
// ROWS/RANGE clause unbounded OVER window
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS, overWindow.isRows)
} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
if (overWindow.isRows) {
Expand Down Expand Up @@ -202,8 +196,8 @@ class DataStreamOverAggregate(

def createBoundedAndCurrentRowOverWindow(
inputDS: DataStream[Row],
isRangeClause: Boolean = false,
isRowTimeType: Boolean = false): DataStream[Row] = {
isRangeClause: Boolean,
isRowTimeType: Boolean): DataStream[Row] = {

val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
Expand Down Expand Up @@ -247,7 +241,8 @@ class DataStreamOverAggregate(
}

def createUnboundedAndCurrentRowEventTimeOverWindow(
inputDS: DataStream[Row]): DataStream[Row] = {
inputDS: DataStream[Row],
isRows: Boolean): DataStream[Row] = {

val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
Expand All @@ -258,7 +253,8 @@ class DataStreamOverAggregate(

val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
namedAggregates,
inputType)
inputType,
isRows)

val result: DataStream[Row] =
// partitioned aggregation
Expand Down
Expand Up @@ -152,7 +152,8 @@ object AggregateUtil {
*/
private[flink] def createUnboundedEventTimeOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
inputType: RelDataType,
isRows: Boolean): UnboundedEventTimeOverProcessFunction = {

val (aggFields, aggregates) =
transformToAggregateFunctions(
Expand All @@ -162,12 +163,23 @@ object AggregateUtil {

val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)

new UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
if (isRows) {
// ROWS unbounded over process function
new UnboundedEventTimeRowsOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
} else {
// RANGE unbounded over process function
new UnboundedEventTimeRangeOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
}
}

/**
Expand Down
Expand Up @@ -41,7 +41,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
* @param inputType the input row tye which the state saved
*
*/
class UnboundedEventTimeOverProcessFunction(
abstract class UnboundedEventTimeOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
private val aggFields: Array[Int],
private val forwardedFieldCount: Int,
Expand All @@ -53,7 +53,7 @@ class UnboundedEventTimeOverProcessFunction(
Preconditions.checkNotNull(aggFields)
Preconditions.checkArgument(aggregates.length == aggFields.length)

private var output: Row = _
protected var output: Row = _
// state to hold the accumulators of the aggregations
private var accumulatorState: ValueState[Row] = _
// state to hold rows until the next watermark arrives
Expand Down Expand Up @@ -162,30 +162,9 @@ class UnboundedEventTimeOverProcessFunction(
val curRowList = rowMapState.get(curTimestamp)
collector.setAbsoluteTimestamp(curTimestamp)

var j = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0

// copy forwarded fields to output row
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

// update accumulators and copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
// emit output row
collector.collect(output)
j += 1
}
// process the same timestamp datas, the mechanism is different according ROWS or RANGE
processElementsWithSameTimestamp(curRowList, lastAccumulator, collector)

rowMapState.remove(curTimestamp)
}

Expand All @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(
* If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
* an append with O(1).
*/
private def insertToSortedList(recordTimeStamp: Long) = {
private def insertToSortedList(recordTimestamp: Long) = {
val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
var continue = true
while (listIterator.hasPrevious && continue) {
val timestamp = listIterator.previous
if (recordTimeStamp >= timestamp) {
if (recordTimestamp >= timestamp) {
listIterator.next
listIterator.add(recordTimeStamp)
listIterator.add(recordTimestamp)
continue = false
}
}

if (continue) {
sortedTimestamps.addFirst(recordTimeStamp)
sortedTimestamps.addFirst(recordTimestamp)
}
}

/**
* Process the same timestamp datas, the mechanism is different between
* rows and range window.
*/
def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit

}

/**
* A ProcessFunction to support unbounded ROWS window.
* The ROWS clause defines on a physical level how many rows are included in a window frame.
*/
class UnboundedEventTimeRowsOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
aggFields: Array[Int],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
extends UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
forwardedFieldCount,
intermediateType,
inputType) {

override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit = {

var j = 0
var i = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0

// copy forwarded fields to output row
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

// update accumulators and copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
// emit output row
out.collect(output)
j += 1
}
}
}


/**
* A ProcessFunction to support unbounded RANGE window.
* The RANGE option includes all the rows within the window frame
* that have the same ORDER BY values as the current row.
*/
class UnboundedEventTimeRangeOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
aggFields: Array[Int],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
extends UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
forwardedFieldCount,
intermediateType,
inputType) {

override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit = {

var j = 0
var i = 0
// all same timestamp data should have same aggregation value.
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
i += 1
}
j += 1
}

// emit output row
j = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)

// copy forwarded fields to output row
i = 0
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

//copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
out.collect(output)
j += 1
}
}
}

0 comments on commit 44f9c76

Please sign in to comment.