Skip to content

Commit

Permalink
made executor save state if non-empty prefix for a node
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed May 26, 2016
1 parent 4794e4b commit e0898b9
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions src/main/scala/workflow/graph/GraphExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package workflow.graph
* Warning: Not thread-safe!
*
* @param graph The underlying graph to execute
* @param state A mapping of GraphId to computed Expression.
* This is the current state of what has already been executed in the graph.
* @param optimize Whether or not to optimize the graph using the global executor before executing anything.
* Defaults to True.
*/
Expand Down Expand Up @@ -44,21 +42,6 @@ private[graph] class GraphExecutor(val graph: Graph, optimize: Boolean = true) {
}
}

/**
* Partially execute a graph up to an input GraphId as far as is possible.
* No node will be executed that depends on unconnected sources, but ancestors the input id
* that do not depend on sources may be executed.
*
* This method updates the internal execution state of the GraphExecutor.
*
* @param graphId The GraphId to execute up to and including if possible.
*/
def partialExecute(graphId: GraphId): Unit = {
val linearization = AnalysisUtils.linearize(optimizedGraph, graphId) :+ graphId
val idsToExecute = linearization.filter(id => !sourceDependants.contains(id))
idsToExecute.foreach(execute)
}

/**
* Execute the graph up to and including an input graph id, and return the result
* of execution at that id.
Expand All @@ -77,7 +60,16 @@ private[graph] class GraphExecutor(val graph: Graph, optimize: Boolean = true) {
val dependencies = optimizedGraph.getDependencies(node)
val depExpressions = dependencies.map(dep => execute(dep))
val operator = optimizedGraph.getOperator(node)
operator.execute(depExpressions)
val expression = operator.execute(depExpressions)

// Save state if prefix exists!
// Note: This whole process isn't threadsafe & things could get messed up if
// multiple pipelines are executing and/or optimizing at once
if (prefixes.contains(node)) {
Pipeline.state(prefixes(node)) = expression
}

expression
}
case sink: SinkId => {
val sinkDep = optimizedGraph.getSinkDependency(sink)
Expand Down

0 comments on commit e0898b9

Please sign in to comment.