-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8868] [table] Support Table Function as Table Source for Stream Sql #6574
Conversation
998834d
to
feaf163
Compare
FLINK-8688 is "Enable distinct aggregation for data stream on Table/SQL API", which doesn't seem related to this PR. |
It should be FLINK-8868, I'll fix this. |
e6b2fcf
to
94f7da2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! I left couple of comments in the code.
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env) | ||
StreamITCase.clear | ||
tEnv.registerFunction("udtf", new TableFunc2WithBase) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this TableFunc2WithBase
here? Couldn't we use TableFunc2
? What does TableFunc2WithBase
give us?
@@ -897,6 +897,45 @@ class SqlITCase extends StreamingWithStateTestBase { | |||
|
|||
assertEquals(List(expected.toString()), StreamITCase.testResults.sorted) | |||
} | |||
|
|||
@Test | |||
def tableFunctionAsSource(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this file is already too big and at least this new tests should be put into something like TableFunctionITCase
.
val scan: FlinkLogicalTableFunctionScan = rel.asInstanceOf[FlinkLogicalTableFunctionScan] | ||
val traitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) | ||
|
||
new DataStreamTableFunctionScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this conversion happen always? How does it play along with for example LogicalCorrelate
nodes, DataStreamCorrelateRule
and org.apache.flink.table.api.stream.sql.CorrelateTest
?
Does it work because if vulcano planer picks this rule in case of LogicalCorrelate
it is later unable to convert LogicalCorrelate
to DataStreamCorrelate
and thus retracts from DataStreamTableFunctionScanRule
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be two cases that TableFunction acts like a Source, one is 'Select * from Lateral Table(udtf())', the other is that the left table do not correlate with the right table so there's no LogicalCorrelate
node.
@@ -897,6 +897,45 @@ class SqlITCase extends StreamingWithStateTestBase { | |||
|
|||
assertEquals(List(expected.toString()), StreamITCase.testResults.sorted) | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests for table API as well. I would expect there to have this error:
org.apache.flink.table.api.ValidationException: Cannot translate a query with an unbounded table function call.
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at org.apache.flink.table.utils.StreamTableTestUtil.printTable(TableTestBase.scala:321)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a test case for TableAPI and a ValidationException
with "TableFunction can only be used in join and leftOuterJoin" will be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant the other way around. We should try to fix this for table api. By saying:
I would expect there to have this error:
I didn't mean that "I would like to have test asserting this validation exception", but "I think you missed testing this feature on table API and it probably will fail there with validation exception"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pnowojski , I think it's better to support SQL only for this time. TableAPI needs more effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the problem with Table API here? I had a suspicion that this ValidationException
:
def getRelNode: RelNode = if (containsUnboundedUDTFCall(logicalPlan)) {
throw new ValidationException("Cannot translate a query with an unbounded table function call.")
} else {
logicalPlan.toRelNode(relBuilder)
}
is being thrown mostly as a precaution, since previously there was no execution code to support it. Now (with this PR) that will not be the case anymore. What would happen if we simply removed it?
| | ||
| } | ||
| | ||
| class ${collectorName} implements ${classOf[Collector[_]].getCanonicalName} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this duplicated with org.apache.flink.table.plan.nodes.CommonCorrelate#generateCollector
?
|
||
val functionCode = | ||
s""" | ||
|public class $funcName extends ${classOf[RichSourceFunction[_]].getCanonicalName} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And doesn't this share a lot of code with org.apache.flink.table.plan.nodes.CommonCorrelate#generateFunction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do some refactor here.
outputType: TypeInformation[T]) | ||
extends CodeGenerator(config, false, new RowTypeInfo(), None, None) { | ||
|
||
def generateSourceFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for asking maybe a stupid question. Why do we need to generate so much code? Should most of the code here (all even all of it) be in standard scala/java classes and the only thing that should be generate is a RexCall
as as an implementation of some interface, that should be used by some concrete java/scala class?
In other words, generating implementation of RichSourceFunction
seams like a bit an overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the UDTF might produces infinite records and can do some initialization, which makes it look like a real source.
@twalthr could you take a look at this one? Especially at the code generation parts since I have little to no experience in this regard. |
What is the purpose of the change
Support Table Function as Table source for Stream Sql
TableFunction might produce infinite records, hence the support for batch sql should be discussed.
Brief change log
DataStreamTableFunctionScan
Verifying this change
This change added tests and can be verified as follows:
new test cases in:
org.apache.flink.table.runtime.stream.sql.SqlITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation