Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6200][SQL] support unbounded event time range window #3649

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -127,14 +127,9 @@ class DataStreamOverAggregate(
// row-time OVER window
if (overWindow.lowerBound.isPreceding &&
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
if (overWindow.isRows) {
// unbounded preceding OVER ROWS window
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
} else {
// unbounded preceding OVER RANGE window
throw new TableException(
"row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.")
}
// ROWS/RANGE clause unbounded OVER window
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS, overWindow.isRows)

} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
if (overWindow.isRows) {
Expand Down Expand Up @@ -240,7 +235,8 @@ class DataStreamOverAggregate(
}

def createUnboundedAndCurrentRowEventTimeOverWindow(
inputDS: DataStream[Row]): DataStream[Row] = {
inputDS: DataStream[Row],
isRows: Boolean): DataStream[Row] = {

val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
Expand All @@ -251,7 +247,8 @@ class DataStreamOverAggregate(

val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
namedAggregates,
inputType)
inputType,
isRows)

val result: DataStream[Row] =
// partitioned aggregation
Expand Down
Expand Up @@ -140,7 +140,8 @@ object AggregateUtil {
*/
private[flink] def createUnboundedEventTimeOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
inputType: RelDataType,
isRows: Boolean): UnboundedEventTimeOverProcessFunction = {

val (aggFields, aggregates) =
transformToAggregateFunctions(
Expand All @@ -150,12 +151,23 @@ object AggregateUtil {

val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)

new UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
if (isRows) {
// ROWS unbounded over process function
new UnboundedEventTimeRowsOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
} else {
// RANGE unbounded over process function
new UnboundedEventTimeRangeOverProcessFunction(
aggregates,
aggFields,
inputType.getFieldCount,
aggregationStateType,
FlinkTypeFactory.toInternalRowTypeInfo(inputType))
}
}

/**
Expand Down
Expand Up @@ -41,7 +41,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
* @param inputType the input row tye which the state saved
*
*/
class UnboundedEventTimeOverProcessFunction(
abstract class UnboundedEventTimeOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
private val aggFields: Array[Int],
private val forwardedFieldCount: Int,
Expand All @@ -53,7 +53,7 @@ class UnboundedEventTimeOverProcessFunction(
Preconditions.checkNotNull(aggFields)
Preconditions.checkArgument(aggregates.length == aggFields.length)

private var output: Row = _
protected var output: Row = _
// state to hold the accumulators of the aggregations
private var accumulatorState: ValueState[Row] = _
// state to hold rows until the next watermark arrives
Expand Down Expand Up @@ -162,30 +162,9 @@ class UnboundedEventTimeOverProcessFunction(
val curRowList = rowMapState.get(curTimestamp)
collector.setAbsoluteTimestamp(curTimestamp)

var j = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0

// copy forwarded fields to output row
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

// update accumulators and copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
// emit output row
collector.collect(output)
j += 1
}
// process the same timestamp datas, the mechanism is different according ROWS or RANGE
processElementsWithSameTimestamp(curRowList, lastAccumulator, collector)

rowMapState.remove(curTimestamp)
}

Expand All @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(
* If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
* an append with O(1).
*/
private def insertToSortedList(recordTimeStamp: Long) = {
private def insertToSortedList(recordTimestamp: Long) = {
val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
var continue = true
while (listIterator.hasPrevious && continue) {
val timestamp = listIterator.previous
if (recordTimeStamp >= timestamp) {
if (recordTimestamp >= timestamp) {
listIterator.next
listIterator.add(recordTimeStamp)
listIterator.add(recordTimestamp)
continue = false
}
}

if (continue) {
sortedTimestamps.addFirst(recordTimeStamp)
sortedTimestamps.addFirst(recordTimestamp)
}
}

/**
* Process the same timestamp datas, the mechanism is different between
* rows and range window.
*/
def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit

}

/**
* A ProcessFunction to support unbounded ROWS window.
* With the ROWS option you define on a physical level how many rows are included in your window frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line violates the 100 character limit of the Scala code style.
Please run a local build before opening a PR to capture such problems (mvn clean install inside of the ./flink-libraries/flink-table folder is usually sufficient and takes ~5 mins).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me, i will pay attention next time.

*/
class UnboundedEventTimeRowsOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
aggFields: Array[Int],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
extends UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
forwardedFieldCount,
intermediateType,
inputType) {

override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit = {

var j = 0
var i = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0

// copy forwarded fields to output row
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

// update accumulators and copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
// emit output row
out.collect(output)
j += 1
}
}
}


/**
* A ProcessFunction to support unbounded RANGE window.
* The RANGE option includes all the rows within the window frame
* that have the same ORDER BY values as the current row
*/
class UnboundedEventTimeRangeOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
aggFields: Array[Int],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
extends UnboundedEventTimeOverProcessFunction(
aggregates,
aggFields,
forwardedFieldCount,
intermediateType,
inputType) {

override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
out: Collector[Row]): Unit = {

var j = 0
var i = 0
// all same timestamp data should have same aggregation value.
while (j < curRowList.size) {
val curRow = curRowList.get(j)
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
i += 1
}
j += 1
}

// emit output row
j = 0
while (j < curRowList.size) {
val curRow = curRowList.get(j)

// copy forwarded fields to output row
i = 0
while (i < forwardedFieldCount) {
output.setField(i, curRow.getField(i))
i += 1
}

//copy aggregates to output row
i = 0
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
out.collect(output)
j += 1
}
}
}