New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11896] [table-planner-blink] Introduce stream physical nodes #7969
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @godfreyhe for the effort and the nice code comment.
I left some ideas below.
extends SingleRel(cluster, traits, inputNode) { | ||
inputRel: RelNode, | ||
val rowtimeFieldIndex: Option[Int], | ||
val watermarkOffset: Option[Long]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to watermarkDelay ? In fact, it is the delay between max-timestamp and current watermark.
The offset terminology is usually means timezone offset in Flink.
/** | ||
* Whether the [[StreamPhysicalRel]] requires rowtime watermark in processing logic. | ||
*/ | ||
def requireWatermark: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about to remove all default value for the interface methods to force the concrete node to implement them?
.item("where", | ||
RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)) | ||
.item("join", getRowType.getFieldNames.mkString(", ")) | ||
.item("joinType", RelExplainUtil.joinTypeToString(flinkJoinType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about to move joinType
before where
? IMO, the information of joinType is more important than where.
pw.input("left", getLeft).input("right", getRight) | ||
.item("where", | ||
RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)) | ||
.item("join", getRowType.getFieldNames.mkString(", ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename join
to select
?
.item("key", uniqueKeys.map(fieldNames.get).mkString(", ")) | ||
.item("select", fieldNames.mkString(", ")) | ||
.item("order", orderString) | ||
.item("mode", if (isLastRowMode) "LastRow" else "FirstRow") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving mode
before key
and remove select
?
Do we really need to display field names in this node ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field names is needless if the RelNode
does not change output type
.item("partitionBy", RelExplainUtil.fieldToString(partitionKey.toArray, inputRowType)) | ||
.item("orderBy", RelExplainUtil.collationToString(sortCollation, inputRowType)) | ||
.item("rankRange", rankRange.toString(inputRowType.getFieldNames)) | ||
.item("strategy", getStrategy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to move strategy before rankFunction as it is more important.
* distinct agg. partial-aggregation produces a partial distinct agg result for each bucket group, | ||
* and then final-aggregation produces final result based on partial result. | ||
* Both partial-aggregation and final-aggregation need to shuffle data. | ||
* partial-final aggregation is enabled only if all distinct aggregate calls are mergeable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the local global can be enabled when all aggregate calls are mergeable. And the partial final can be enabled when all aggregates are splittable.
2. perfect comments 3. adjust explain terms order
Thanks for your suggestion @wuchong. I have updated this PR based on comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @godfreyhe , I only left a minor comment.
Others looks good to me.
watermarkOffset: Long) | ||
extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) | ||
rowtimeFieldIndex: Option[Int], | ||
watermarkOffset: Option[Long]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename all the other watermarkOffset
to watermarkDelay
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Base physical class for flink [[Join]]. | ||
*/ | ||
trait CommonJoin extends Join with FlinkPhysicalRel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to CommonPhysicalJoin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, i find out that CommonExchange
is inherited directly from FlinkRelNode
, but only have physical children. Could you also make CommonExchange
to inherit from FlinkPhysicalRel
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
+1 from my side. |
…nJoin` to `CommonPhysicalJoin`
LGTM, merging |
What is the purpose of the change
Introduce stream physical RelNode
Brief change log
Verifying this change
This change is an initialization work without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation