Enhance incremental computation support in Texera#2165
Conversation
Yicong-Huang
left a comment
There was a problem hiding this comment.
The PR looks good and clean! Left some small comments in code. Although, I am not very sure about the new incremental join operator, what is it behavior if the inputs are already retractable?
| var rewrittenLogicalPlan = | ||
| WorkflowCacheRewriter.transform(logicalPlan, opResultStorage, opsToReuseCache) | ||
| rewrittenLogicalPlan.operatorMap.values.foreach(initOperator) | ||
|
|
||
| // perform rewrite to enforce progressive computation constraints | ||
| rewrittenLogicalPlan = ProgressiveRetractionEnforcer.enforceDelta(rewrittenLogicalPlan, context) |
There was a problem hiding this comment.
I suggest creating a new variable name for each step of the rewrite, as they are rewrites with different purposes.
| private def shouldEmitOutput(): Boolean = { | ||
| System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS | ||
| } | ||
|
|
||
| private def emitOutputAndResetState(): scala.Iterator[Tuple] = { | ||
| lastUpdatedTime = System.currentTimeMillis | ||
| val resultIterator = getPartialOutputs() | ||
| this.partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]() | ||
| resultIterator | ||
| } |
There was a problem hiding this comment.
I see similar code for partial and final aggregate operators to do time-based snapshots to push partial results out. If the time-based snapshot is a universal strategy for incremental operators to push out partial results, is it better to make it a standard framework?
| */ | ||
| public BuilderV2 addSequentially(Object[] fields) { | ||
| checkNotNull(fields); | ||
| checkSchemaMatchesFields(schema.getAttributes(), Lists.newArrayList(fields)); |
There was a problem hiding this comment.
I think we need such an assertion for the normal tuple fields. if we need to add new fields (e.g., retraction or not), we can treat it separately? If so, I can do it in a future PR.
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| object ProgressiveRetractionEnforcer { |
There was a problem hiding this comment.
Could you add some doc to explain this enforcer's duty?
| } | ||
| Schema | ||
| .newBuilder() | ||
| .add(ProgressiveUtils.insertRetractFlagAttr) |
There was a problem hiding this comment.
Alternatively, we can have a Builder.allowRetract() to add this attribute internally for users?
| val builder = Tuple | ||
| .newBuilder(operatorSchemaInfo.outputSchemas(0)) | ||
| .add(left) |
There was a problem hiding this comment.
Is there a case where the input left tuples and/or right tuples are already supporting retraction?
|
will revisit after complier refactoring. |
|
@zuozhiw do you want to work with me to finish this PR? |
This PR enhances incremental computation support in Texera, including:
PartialAggregateOpExecandFinalAggregateOpExecare updated to use incremental computation. They will perdoically emit partial results to downstream.AggregateandLineChartoperators now use the new aggregation framework. Other aggregate-based visualizations are not using it as they are now implemented with Python UDFs and HTML visualizations.WordCloudis not using the new framework, asWordCloudis a special top-k aggregation.supportRetractableInputto indicate whether an operator support retractions as input tuples.For detailed technical presentation on incremental computation, see this slide and descriptions in this PR