New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6090] [table] Add RetractionRule at the stage of decoration #3696
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @hequn8128 !
I think the rules work correctly and assign the right AccMode
to each DataSetRel
node.
Also the tests look very good.
However, I think the code could be improved in a few ways.
- The rules could be implemented in a more Scala-esque way. There are many
var
variables and mutable collections which are discouraged in Scala. - It took me quite some time to understand the logic of the rules. In fact, I only fully understood how they worked when I debugged the code. I think this could be improved by renaming some of the classes, methods, and variables.
- I would split the RetractionTrait into two traits. Unless I didn't understand it correctly, the
needToRetract
property is only needed to compute the correctAccMode
and not needed afterwards.
I made a couple of comments in the PR and suggested some refactorings.
I also created a commit that contains my proposed changes and opened a PR against your PR branch.
If you agree with these changes, I would squash them into your commit before merging.
An open question is whether we want to hardcode the operator classes in the rules or if we want to extend the DataStreamRel
interface to provide the necessary information to compute about the AccMode
of the plan operators. IMO, this would be a better approach because it keeps the information about the properties directly in the nodes. It might also be less error-prone when some operator implementations change.
Please let me know what you think @hequn8128 and @shaoxuan-wang.
Thanks, Fabian
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row | |||
class DataStreamCorrelate( | |||
cluster: RelOptCluster, | |||
traitSet: RelTraitSet, | |||
inputNode: RelNode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert these changes. We try to avoid reformatting changes because they make PRs harder to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, sorry for missing any commons about this fix. After decoration phase, the class type of inputNode is HepRelVertex
and it will throws ClassCastException at val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
. You can reproduce this exception by running tests in DataStreamUserDefinedFunctionITCase
. The reason why there is no problem after runVolcanoPlanner is that DataStreamCorrelateRule
does the transformation from RelSubset
to DataStreamRel
. I think it's better to override the input
parameter and use getInput
when translate to plan. What do you think, thx~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, OK. Thanks for the explanation!
That makes sense :-)
* @param topRel The input RelNode | ||
* @return All child nodes | ||
*/ | ||
def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change return type to Seq[RelNode]
* @return All child nodes | ||
*/ | ||
def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = { | ||
val topRelInputs = new ListBuffer[RelNode]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method can be simplified to topRel.getInputs.asScala.transform(_.asInstanceOf[HepRelVertex].getCurrentRel)
class InitProcessRule extends RelOptRule( | ||
operand( | ||
classOf[DataStreamRel], none()), | ||
"InitProcessRule") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to "InitRectractionRule"?
/** | ||
* Rule that init retraction trait inside a [[DataStreamRel]]. If a [[DataStreamRel]] does not | ||
* contain retraction trait, initialize one for the [[DataStreamRel]]. After | ||
* initiallization, needToRetract will be set to false and AccMode will be set to Acc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"initiallization" -> "initialization"
|
||
def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = { | ||
val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE) | ||
if (null == retractionTrait) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified to
null != retraction && retractionTrait.getNeedToRetract
val traitSet = relNode.getTraitSet | ||
var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE) | ||
if (null == retractionTrait) { | ||
retractionTrait = new RetractionTrait(true, AccMode.Acc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to split RetractionTrait
into two traits. As far as I can tell, the needToRetract
property is only need to to compute the correct AccMode
.
val bottomTraits = bottomRel.getTraitSet | ||
if(!traitSetContainNeedToRetract(bottomTraits)){ | ||
topRel match { | ||
case _: DataStreamGroupAggregate => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it would be better to extend DataStreamRel
with a couple of methods that expose the retraction behavior of each operator (for example requiresUpdatesAsRetractions()
, forwardsRetractions()
, consumesRetractions()
, producesRetractions()
, or producesUpdates()
).
I think hardcoding the operator classes here is not a good solution, because it might easily break if the rules are not correctly updated when the operators change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, you are right. I have extended DataStreamRel
with three methods: needsUpdatesAsRetraction()
, producesUpdates()
and consumesRetractions()
} | ||
|
||
override def onMatch(call: RelOptRuleCall): Unit = { | ||
val topRel = call.rel(0).asInstanceOf[DataStreamRel] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually top
denotes the top node of a tree and bottom
the leave nodes of a tree. I think parent
and children
/ child
would be more appropriate terms here.
private var accMode = AccMode.Acc | ||
|
||
|
||
def this(needToRetract: Boolean, accMode: AccMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split this trait into two traits. Unless I'm wrong, we are eventually only interested in the AccMode
and needToRetract
is just temporarily needed to compute the correct AccMode
. Hence, I would split this into two traits.
hi @fhueske , Thanks for your review and refactorings. I think it's pretty good and I have learned a lot form it. I left a few comments in your PR. As for the hardcode problem, I think you are right, it's better to extend the DataStreamRel with a couple of methods. I have added some methods in DataStreamRel, so you can use them directly when you squash your PR into my commit, thx~ |
@fhueske, thanks for the review and valuable comments. Thanks, |
Hi @hequn8128 and @shaoxuan-wang, I merged this PR to the |
Add RetractionRules at the stage of decoration. These rules can derive NeedRetraction property and accumulating mode. There are three rules:
1.InitProcessRule. This rule inits NeedRetraction property and AccMode for DatastreamRels.
2.NeedToRetractProcessRule. This rule derives NeedRetraction property.
3.AccModeProcessRule.Find all AccRetract nodes. This rule derives accumulating mode.
General
Documentation
Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed