Skip to content

Commit

Permalink
Gradual progress on incremental pipeline construction continues
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed May 19, 2016
1 parent d859551 commit 8018eb1
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 31 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/workflow/graph/AnalysisUtils.scala
Expand Up @@ -85,14 +85,14 @@ private[graph] object AnalysisUtils {
* @param id A node/sink/source in the graph
* @return The ancestors of that id in sorted topological order
*/
def linearize(graph: Graph, id: GraphId): Seq[GraphId] = {
val deps: Seq[GraphId] = id match {
def linearize(graph: Graph, id: GraphId): Seq[NodeOrSourceId] = {
val deps: Seq[NodeOrSourceId] = id match {
case source: SourceId => Seq()
case node: NodeId => graph.getDependencies(node)
case sink: SinkId => Seq(graph.getSinkDependency(sink))
}

deps.foldLeft(Seq[GraphId]()) {
deps.foldLeft(Seq[NodeOrSourceId]()) {
case (linearization, dep) => if (!linearization.contains(dep)) {
linearization ++ linearize(graph, dep).filter(id => !linearization.contains(id)) :+ dep
} else {
Expand Down
71 changes: 53 additions & 18 deletions src/main/scala/workflow/graph/Pipeline.scala
Expand Up @@ -8,8 +8,23 @@ import pipelines.Logging

import scala.reflect.ClassTag

class GraphExecutor(val graph: Graph, val optimizer: Option[Optimizer]) {
private lazy val optimizedGraph: Graph = optimizer.map(_.execute(graph)).getOrElse(graph)
class GraphExecutor(val graph: Graph, val optimizer: Option[Optimizer], initExecutionState: Map[GraphId, Expression] = Map()) {
private var optimized: Boolean = false
private lazy val optimizedState: (Graph, scala.collection.mutable.Map[GraphId, Expression]) = {
optimized = true
val (newGraph, newExecutionState) = optimizer.map(_.execute(graph, initExecutionState))
.getOrElse((graph, initExecutionState))
(newGraph, scala.collection.mutable.Map() ++ newExecutionState)
}
private lazy val optimizedGraph: Graph = optimizedState._1
private lazy val executionState: scala.collection.mutable.Map[GraphId, Expression] = optimizedState._2

// TODO: FIXME: Maybe make separate methods for current graph state and current execution state?
def currentState: (Graph, Map[GraphId, Expression]) = if (optimized) {
(optimizedGraph, executionState.toMap)
} else {
(graph, initExecutionState)
}

// Todo put comment: A result is unstorable if it implicitly depends on any source
private lazy val unstorableResults: Set[GraphId] = {
Expand All @@ -18,7 +33,22 @@ class GraphExecutor(val graph: Graph, val optimizer: Option[Optimizer]) {
}
}

private val executionResults: scala.collection.mutable.Map[GraphId, Expression] = scala.collection.mutable.Map()
// TODO rename and comment. This method executes & stores all ancestors of a sink that don't depend on sources
def executeAndSaveWithoutSources(sink: SinkId): Unit = {
val linearizedAncestors = AnalysisUtils.linearize(optimizedGraph, sink)
val ancestorsToExecute = linearizedAncestors.filter(id => !unstorableResults.contains(id))
ancestorsToExecute.foreach {
case source: SourceId => throw new RuntimeException("Linearized ancestors to execute should not contain sources")
case node: NodeId => {
val dependencies = optimizedGraph.getDependencies(node)
val depExpressions = dependencies.map(dep => executionState(dep))
val operator = optimizedGraph.getOperator(node)
executionState(node) = operator.execute(depExpressions)
}
}

executionState(sink) = executionState(optimizedGraph.getSinkDependency(sink))
}

private def getUncachedResult(graphId: GraphId, sources: Map[SourceId, Expression]): Expression = {
graphId match {
Expand All @@ -40,7 +70,7 @@ class GraphExecutor(val graph: Graph, val optimizer: Option[Optimizer]) {
if (unstorableResults.contains(graphId)) {
getUncachedResult(graphId, sources)
} else {
executionResults.getOrElseUpdate(graphId, getUncachedResult(graphId, sources))
executionState.getOrElseUpdate(graphId, getUncachedResult(graphId, sources))
}
}

Expand Down Expand Up @@ -114,9 +144,10 @@ object PipelineRDDUtils {
}
}

trait Pipeline[A, B] extends GraphBackedExecution {
def getSource: SourceId = getSources.head
def getSink: SinkId = getSinks.head
trait Pipeline[A, B] {
private[graph] val source: SourceId
private[graph] val sink: SinkId
private[graph] def executor: GraphExecutor

final def apply(datum: A): PipelineDatumOut[B] = apply(PipelineRDDUtils.datumToPipelineDatumOut(datum))

Expand All @@ -135,8 +166,10 @@ trait Pipeline[A, B] extends GraphBackedExecution {

// TODO: Clean up this method
final def andThen[C](next: Pipeline[B, C]): Pipeline[A, C] = {
val (newGraph, _, _, newSinkMappings) = getGraph.connectGraph(next.getGraph, Map(next.getSource -> getSink))
new ConcretePipeline(new GraphExecutor(newGraph, getOptimizer), getSource, newSinkMappings(next.getSink))
val (newGraph, newSourceMappings, newNodeMappings, newSinkMappings) = executor.currentState._1.connectGraph(next.executor.currentState._1, Map(next.source -> sink))
val graphIdMappings: Map[GraphId, GraphId] = newSourceMappings ++ newNodeMappings ++ newSinkMappings
val newExecutionState = executor.currentState._2 ++ next.executor.currentState._2.map(x => (graphIdMappings(x._1), x._2))
new ConcretePipeline(new GraphExecutor(newGraph, executor.optimizer, newExecutionState), source, newSinkMappings(next.sink))
}

final def andThen[C](est: Estimator[B, C], data: RDD[A]): Pipeline[A, C] = {
Expand Down Expand Up @@ -165,16 +198,16 @@ trait Pipeline[A, B] extends GraphBackedExecution {

}

class ConcretePipeline[A, B](executor: GraphExecutor, source: SourceId, sink: SinkId) extends Pipeline[A, B] {
setExecutor(executor)
setSources(Seq(source))
setSinks(Seq(sink))
}
class ConcretePipeline[A, B](
@Override private[Graph] val executor: GraphExecutor,
@Override private[Graph] val source: SourceId,
@Override private[Graph] val sink: SinkId
) extends Pipeline[A, B]

abstract class Transformer[A, B : ClassTag] extends TransformerOperator with Pipeline[A, B] {
setExecutor(new GraphExecutor(Graph(Set(SourceId(0)), Map(SinkId(0) -> NodeId(0)), Map(NodeId(0) -> this), Map(NodeId(0) -> Seq(SourceId(0)))), None))
setSources(Seq(SourceId(0)))
setSinks(Seq(SinkId(0)))
@Override @transient private[Graph] lazy val executor = new GraphExecutor(Graph(Set(SourceId(0)), Map(SinkId(0) -> NodeId(0)), Map(NodeId(0) -> this), Map(NodeId(0) -> Seq(SourceId(0)))), None)
@Override private[Graph] val source = SourceId(0)
@Override private[Graph] val sink = SinkId(0)

protected def singleTransform(in: A): B
protected def batchTransform(in: RDD[A]): RDD[B] = in.map(singleTransform)
Expand Down Expand Up @@ -212,6 +245,7 @@ abstract class Estimator[A, B] extends EstimatorOperator {
val (almostFinalGraph, delegatingId) = newGraphWithSource.addNode(new DelegatingOperator, Seq(nodeId, sourceId))
val (finalGraph, sinkId) = almostFinalGraph.addSink(delegatingId)

// FIXME TODO: MAINTAIN STATE!
new ConcretePipeline(new GraphExecutor(finalGraph, data.getExecutor.optimizer), sourceId, sinkId)
}

Expand All @@ -237,6 +271,7 @@ abstract class LabelEstimator[A, B, L] extends EstimatorOperator {
val (almostFinalGraph, delegatingId) = newGraphWithSource.addNode(new DelegatingOperator, Seq(nodeId, sourceId))
val (finalGraph, sinkId) = almostFinalGraph.addSink(delegatingId)

// FIXME TODO: MAINTAIN STATE!
new ConcretePipeline(new GraphExecutor(finalGraph, data.getExecutor.optimizer), sourceId, sinkId)
}

Expand Down Expand Up @@ -315,7 +350,7 @@ object Pipeline {
}
}

def tie(graphBackedExecutions: Seq[GraphBackedExecution], optimizer: Option[Optimizer]): Unit = {
def submit(graphBackedExecutions: Seq[GraphBackedExecution], optimizer: Option[Optimizer]): Unit = {
GraphBackedExecution.tie(graphBackedExecutions, optimizer)
}
}
2 changes: 1 addition & 1 deletion src/main/scala/workflow/graph/Rule.scala
Expand Up @@ -11,5 +11,5 @@ abstract class Rule {
if (className endsWith "$") className.dropRight(1) else className
}

def apply(plan: Graph): Graph
def apply(plan: Graph, executionState: Map[GraphId, Expression]): (Graph, Map[GraphId, Expression])
}
12 changes: 6 additions & 6 deletions src/main/scala/workflow/graph/RuleExecutor.scala
Expand Up @@ -26,8 +26,8 @@ abstract class RuleExecutor extends Logging {
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
*/
def execute(plan: Graph): Graph = {
var curPlan = plan
def execute(plan: Graph, executionState: Map[GraphId, Expression]): (Graph, Map[GraphId, Expression]) = {
var curPlan = (plan, executionState)

batches.foreach { batch =>
val batchStartPlan = curPlan
Expand All @@ -39,13 +39,13 @@ abstract class RuleExecutor extends Logging {
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (prevPlan, rule) =>
val result = rule(prevPlan)
val result = rule(prevPlan._1, prevPlan._2)
if (!(result == prevPlan)) {
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${prevPlan.toDOTString}
|${result.toDOTString}
|${prevPlan._1.toDOTString}
|${result._1.toDOTString}
""".stripMargin)
}

Expand Down Expand Up @@ -73,7 +73,7 @@ abstract class RuleExecutor extends Logging {
s"""
|=== Result of Batch ${batch.name} ===
|${plan.toDOTString}
|${curPlan.toDOTString}
|${curPlan._1.toDOTString}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/workflow/graph/PipelineSuite.scala
Expand Up @@ -40,7 +40,7 @@ class PipelineSuite extends FunSuite with LocalSparkContext with Logging {

val pipelineOut = pipeline(data)
val pipelineOut2 = pipeline(data)
Pipeline.tie(Seq(pipelineOut, pipelineOut2), Some(DefaultOptimizer))
Pipeline.submit(Seq(pipelineOut, pipelineOut2), Some(DefaultOptimizer))

pipelineOut.get().collect()
pipelineOut2.get().collect()
Expand Down Expand Up @@ -73,7 +73,7 @@ class PipelineSuite extends FunSuite with LocalSparkContext with Logging {
val pipelineOutTwo = pipelineChainTwo(data)
val modelOut = model(data)

Pipeline.tie(Seq(pipelineOutOne, pipelineOutTwo, modelOut), None)
Pipeline.submit(Seq(pipelineOutOne, pipelineOutTwo, modelOut), None)

assert(pipelineOutOne.get().collect().toSeq === Seq(32*2 + 32*2, 94*2 + 32*2, 12*2 + 32*2))
assert(pipelineOutTwo.get().collect().toSeq === Seq(32*2 + 32*2, 94*2 + 32*2, 12*2 + 32*2))
Expand Down Expand Up @@ -106,7 +106,7 @@ class PipelineSuite extends FunSuite with LocalSparkContext with Logging {
val pipelineOutTwo = pipelineChainTwo(data)
val modelOut = model(data)

Pipeline.tie(Seq(pipelineOutOne, pipelineOutTwo, modelOut, features), None)
Pipeline.submit(Seq(pipelineOutOne, pipelineOutTwo, modelOut, features), None)

assert(pipelineOutOne.get().collect().toSeq === Seq(32*2 + 32*2 + 10, 94*2 + 32*2 + 10, 12*2 + 32*2 + 10))
assert(pipelineOutTwo.get().collect().toSeq === Seq(32*2 + 32*2 + 10, 94*2 + 32*2 + 10, 12*2 + 32*2 + 10))
Expand Down

0 comments on commit 8018eb1

Please sign in to comment.