Skip to content

Commit

Permalink
more prefix progress
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed May 26, 2016
1 parent e0898b9 commit a744435
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/workflow/graph/DanglingNodeRemovalRule.scala
Expand Up @@ -9,8 +9,8 @@ object DanglingNodeRemovalRule extends Rule {
case (ancestors, sink) => ancestors ++ AnalysisUtils.getAncestors(plan, sink)
}

val nodesToRemove = plan.nodes -- ancestorsOfSinks.collect{ case node: NodeId => node }
val sourcesToRemove = plan.sources -- ancestorsOfSinks.collect{ case source: SourceId => source }
val nodesToRemove = plan.nodes -- ancestorsOfSinks.collect { case node: NodeId => node }
val sourcesToRemove = plan.sources -- ancestorsOfSinks.collect { case source: SourceId => source }

val afterSourceRemoval = sourcesToRemove.foldLeft(plan) {
case (curPlan, sourceToRemove) => curPlan.removeSource(sourceToRemove)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/workflow/graph/DefaultOptimizer.scala
Expand Up @@ -5,6 +5,7 @@ package workflow.graph
*/
object DefaultOptimizer extends Optimizer {
protected val batches: Seq[Batch] =
Batch("Load Saved State", Once, ExtractSaveablePrefixes, SavedStateLoadRule, DanglingNodeRemovalRule) ::
Batch("Common Sub-expression Elimination", FixedPoint(Int.MaxValue), EquivalentNodeMergeRule) ::
Nil
}
11 changes: 11 additions & 0 deletions src/main/scala/workflow/graph/ExtractSaveablePrefixes.scala
@@ -0,0 +1,11 @@
package workflow.graph

/**
* Extract the prefixes whose state we want to save (aka for any cacher or estimator node)
*/
object ExtractSaveablePrefixes extends Rule {
override def apply(plan: Graph, prefixes: Map[NodeId, Prefix]): (Graph, Map[NodeId, Prefix]) = {
// TODO FIXME
(plan, prefixes)
}
}

0 comments on commit a744435

Please sign in to comment.