Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public boolean matches(RelOptRuleCall call) {
boolean existJavaFunction =
aggCalls.stream().anyMatch(x -> !PythonUtil.isPythonAggregate(x, null));
if (existPandasFunction || existGeneralPythonFunction) {
if (existJavaFunction) {
if (existGeneralPythonFunction) {
throw new TableException(
"Python UDAF and Java/Scala UDAF cannot be used together.");
"non-Pandas UDAFs are not supported in batch mode currently.");
}
if (existPandasFunction && existGeneralPythonFunction) {
if (existJavaFunction) {
throw new TableException(
"Pandas UDAF and non-Pandas UDAF cannot be used together.");
"Python UDAF and Java/Scala UDAF cannot be used together.");
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ class BatchPhysicalOverAggregateRule
.map(_._2)
.exists(_.map(_._1).exists(!isPythonAggregate(_)))
if (existPandasFunction || existGeneralPythonFunction) {
if (existGeneralPythonFunction) {
throw new TableException("non-Pandas UDAFs are not supported in batch mode currently.")
}
if (existJavaFunction) {
throw new TableException("Python UDAF and Java/Scala UDAF cannot be used together.")
}
if (existPandasFunction && existGeneralPythonFunction) {
throw new TableException("Pandas UDAF and non-Pandas UDAF cannot be used together.")
}
}
overWindowAgg = if (existJavaFunction) {
new BatchPhysicalOverAggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.batch.table

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{PandasAggregateFunction, TestPythonAggregateFunction}
import org.apache.flink.table.planner.utils.TableTestBase

import org.junit.Test
Expand Down Expand Up @@ -101,4 +101,19 @@ class PythonGroupWindowAggregateTest extends TableTestBase {

util.verifyExecPlan(resultTable)
}

@Test(expected = classOf[TableException])
def testGeneralEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = batchTestUtil()
val sourceTable = util.addTableSource[(Int, Long, Int, Long)](
"MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
val func = new TestPythonAggregateFunction

val resultTable = sourceTable
.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'b)
.select('b, 'w.start,'w.end, func('a, 'c))

util.verifyExecPlan(resultTable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.batch.table

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{PandasAggregateFunction, TestPythonAggregateFunction}
import org.apache.flink.table.planner.utils.TableTestBase
import org.junit.Test

Expand Down Expand Up @@ -63,4 +63,23 @@ class PythonOverWindowAggregateTest extends TableTestBase {

util.verifyExecPlan(resultTable)
}

@Test(expected = classOf[TableException])
def testGeneralRangeOverWindowAggregate(): Unit = {
val util = batchTestUtil()
val sourceTable = util.addTableSource[(Int, Long, Int, Long)](
"MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
val func = new TestPythonAggregateFunction

val resultTable = sourceTable
.window(
Over
partitionBy 'b
orderBy 'rowtime
preceding UNBOUNDED_RANGE
as 'w)
.select('b, func('a, 'c) over 'w)

util.verifyExecPlan(resultTable)
}
}