New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11975][table-planner-blink] Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)' #8035
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
dec0598
to
2386edf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @beyond1920 for your pull request, I left some comments.
...flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
Outdated
Show resolved
Hide resolved
...able-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/TypeUtils.java
Outdated
Show resolved
Hide resolved
...able/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
Outdated
Show resolved
Hide resolved
...flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java
Outdated
Show resolved
Hide resolved
...nk-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/BaseRowUtil.java
Outdated
Show resolved
Hide resolved
...nk-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/BaseRowUtil.java
Outdated
Show resolved
Hide resolved
...nk-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/BaseRowUtil.java
Outdated
Show resolved
Hide resolved
...le/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
Outdated
Show resolved
Hide resolved
...le-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseUpsertStreamTableSink.scala
Outdated
Show resolved
Hide resolved
* | ||
* @tparam T Type of records that this [[TableSink]] expects and supports. | ||
*/ | ||
trait BaseUpsertStreamTableSink[T] extends StreamTableSink[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify it to java Interface?
* | ||
* @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. | ||
*/ | ||
trait BatchTableSink[T] extends TableSink[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java interface?
...le-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseUpsertStreamTableSink.scala
Outdated
Show resolved
Hide resolved
* | ||
* @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. | ||
*/ | ||
trait AppendStreamTableSink[T] extends StreamTableSink[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java interface?
...e-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
Outdated
Show resolved
Hide resolved
...ink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/util/TimeConvertUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR! I think the codes need some more refinements, left some comments
/** | ||
* A {@link TimeConvertUtils} is used to convert Time Type. | ||
*/ | ||
public class TimeConvertUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could keep this in table-planner-blink
. And a further question is, do we really need this? What's the differences between this utility and DateTimeUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeConvertUtils is to fix TimeZone problems, which is refactor in FLINK-11898, So temporally to throw an Exception here.
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/StateUtil.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/StateUtil.java
Outdated
Show resolved
Hide resolved
...nk-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
Outdated
Show resolved
Hide resolved
...nk-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
Outdated
Show resolved
Hide resolved
...nk-table-runtime-blink/src/main/java/org/apache/flink/table/generated/ValuesInputFormat.java
Outdated
Show resolved
Hide resolved
/** | ||
* Returns [[DamBehavior]] of this node. | ||
*/ | ||
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PIPELINED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?? I don't catch your meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The definition of PIPELINED in dam behavior is that the operator will not cache any data and all records will pass through in a pipelined fashion. For sink operator, the records will not pass through it, right? To me, the behavior is more like to be FULL_DAM
, except the sink operator will not return or output any record after it consumes all the input data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DamBehavior of sink is never used actually, so it's value will not effect schedule or deadlock breaking. However, Full_DAM is a more appropriate choice in this point of view, I will update it.
...er-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
Outdated
Show resolved
Hide resolved
...er-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
Outdated
Show resolved
Hide resolved
...er-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
Outdated
Show resolved
Hide resolved
...er-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
Show resolved
Hide resolved
@JingsongLi thanks for your comments. All done, except one minor question, TableSink interfaces still in Scala instead of Java, could do it later with all sinks and sources. |
@KurtYoung Thanks for your comments. All done, except two minor question:
|
0b90cfc
to
b2f2f20
Compare
…CT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
cb3f443
to
32a1fb3
Compare
+1 LGTM |
2. Rename BaseUpsertStreamTableSink, BaseRetractStreamTableSink 3. Update codeStyle 4. other minor update based on comments
...link-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/ValuesInputFormat.java
Outdated
Show resolved
Hide resolved
...-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
Outdated
Show resolved
Hide resolved
ea18552
to
c6ce3fe
Compare
2. Move valuesInputFormat to values subpackage 3. Minor update in StreamExecSink and BatchExecSink
What is the purpose of the change
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
Brief change log
Verifying this change
ITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation