Skip to content

Commit

Permalink
[FLINK-5047] [table] Add sliding group-windows for batch tables
Browse files Browse the repository at this point in the history
This closes apache#3364.
  • Loading branch information
twalthr committed Mar 8, 2017
1 parent b79c7ef commit a96ba15
Show file tree
Hide file tree
Showing 15 changed files with 1,257 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
def getValue(accumulator: Accumulator): T

/**
* Process the input values and update the provided accumulator instance.
* Processes the input values and update the provided accumulator instance.
*
* @param accumulator the accumulator which contains the current
* aggregated results
Expand All @@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
def accumulate(accumulator: Accumulator, input: Any): Unit

/**
* Merge a list of accumulator instances into one accumulator instance.
* Merges a list of accumulator instances into one accumulator instance.
*
* IMPORTANT: You may only return a new accumulator instance or the the first accumulator of the
* IMPORTANT: You may only return a new accumulator instance or the first accumulator of the
* input list. If you return another instance, the result of the aggregation function might be
* incorrect.
*
Expand All @@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
*
* @return The type information for the accumulator.
*/
def getAccumulatorType(): TypeInformation[_] = null
def getAccumulatorType: TypeInformation[_] = null
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,25 @@ class DataSetWindowAggregate(

// whether identifiers are matched case-sensitively
val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()

window match {
case EventTimeTumblingGroupWindow(_, _, size) =>
createEventTimeTumblingWindowDataSet(
inputDS,
isTimeInterval(size.resultType),
caseSensitive)

case EventTimeSessionGroupWindow(_, _, gap) =>
createEventTimeSessionWindowDataSet(inputDS, caseSensitive)
case EventTimeSlidingGroupWindow(_, _, _, _) =>
throw new UnsupportedOperationException(
"Event-time sliding windows in a batch environment are currently not supported")

case EventTimeSlidingGroupWindow(_, _, size, slide) =>
createEventTimeSlidingWindowDataSet(
inputDS,
isTimeInterval(size.resultType),
asLong(size),
asLong(slide),
caseSensitive)

case _: ProcessingTimeGroupWindow =>
throw new UnsupportedOperationException(
"Processing-time tumbling windows are not supported in a batch environment, " +
Expand All @@ -130,7 +138,6 @@ class DataSetWindowAggregate(
}
}


private def createEventTimeTumblingWindowDataSet(
inputDS: DataSet[Row],
isTimeWindow: Boolean,
Expand Down Expand Up @@ -312,6 +319,108 @@ class DataSetWindowAggregate(
}
}

private def createEventTimeSlidingWindowDataSet(
inputDS: DataSet[Row],
isTimeWindow: Boolean,
size: Long,
slide: Long,
isParserCaseSensitive: Boolean)
: DataSet[Row] = {

// create MapFunction for initializing the aggregations
// it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates
val mapFunction = createDataSetWindowPrepareMapFunction(
window,
namedAggregates,
grouping,
inputType,
isParserCaseSensitive)

val mappedDataSet = inputDS
.map(mapFunction)
.name(prepareOperatorName)

val mapReturnType = mappedDataSet.getType

val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val groupingKeys = grouping.indices.toArray

// do partial aggregation if possible
val isPartial = doAllSupportPartialMerge(
namedAggregates.map(_.getKey),
inputType,
grouping.length)

// only pre-tumble if it is worth it
val isLittleTumblingSize = determineLargestTumblingSize(size, slide) <= 1

val preparedDataSet = if (isTimeWindow) {
// time window

if (isPartial && !isLittleTumblingSize) {
// partial aggregates

val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1

// create GroupReduceFunction
// for pre-tumbling and replicating/omitting the content for each pane
val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
window,
namedAggregates,
grouping,
inputType,
isParserCaseSensitive)

mappedDataSet.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeysAndAlignedRowtime: _*)
.reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
.name(prepareOperatorName)
} else {
// non-partial aggregates

// create FlatMapFunction
// for replicating/omitting the content for each pane
val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction(
window,
namedAggregates,
grouping,
mapReturnType,
isParserCaseSensitive)

mappedDataSet
.flatMap(prepareFlatMapFunction) // replicates/omits
}
} else {
// count window

throw new UnsupportedOperationException(
"Count sliding group windows on event-time are currently not supported.")
}

val prepareReduceReturnType = preparedDataSet.getType

// create GroupReduceFunction for final aggregation and conversion to output row
val aggregateReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
window,
namedAggregates,
inputType,
rowRelDataType,
grouping,
namedProperties,
isInputCombined = false)

// gets the window-start position in the intermediate result.
val windowStartPos = prepareReduceReturnType.getArity - 1

val groupingKeysAndWindowStart = groupingKeys :+ windowStartPos

preparedDataSet
.groupBy(groupingKeysAndWindowStart: _*)
.reduceGroup(aggregateReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
}

private def prepareOperatorName: String = {
val aggString = aggregationToString(
inputType,
Expand Down

0 comments on commit a96ba15

Please sign in to comment.