Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
made the instructionsToPipeline method functional-style w/ no mutability
  • Loading branch information
tomerk committed Feb 1, 2016
1 parent 7689544 commit d1b970a
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions src/main/scala/workflow/WorkflowUtils.scala
@@ -1,57 +1,49 @@
package workflow

import scala.collection.mutable.ArrayBuffer

object WorkflowUtils {
def instructionsToPipeline[A, B](instructions: Seq[Instruction]): Pipeline[A, B] = {
val nodes = new ArrayBuffer[Node]()
val dataDeps = new ArrayBuffer[Seq[Int]]()
val fitDeps = new ArrayBuffer[Option[Int]]()
val instructionIdToNodeId = scala.collection.mutable.Map.empty[Int, Int]
instructionIdToNodeId.put(Pipeline.SOURCE, Pipeline.SOURCE)
val (newNodes, newDataDeps, newFitDeps, _) = instructions.indices.foldLeft(
(Seq[Node](), Seq[Seq[Int]](), Seq[Option[Int]](), Map(Pipeline.SOURCE -> Pipeline.SOURCE))
) {
case ((nodes, dataDeps, fitDeps, idMap), instruction) =>
instructions(instruction) match {
case est: EstimatorNode => (nodes, dataDeps, fitDeps, idMap)
case transformer: TransformerNode => (nodes, dataDeps, fitDeps, idMap)
case source: SourceNode =>
(nodes :+ source, dataDeps :+ Seq(), fitDeps :+ None, idMap + (instruction -> nodes.length))
case TransformerApplyNode(transformer, inputs) => {
instructions(transformer) match {
case transformerNode: TransformerNode => (
nodes :+ transformerNode,
dataDeps :+ inputs.map(idMap.apply),
fitDeps :+ None,
idMap + (instruction -> nodes.length))

for (instruction <- instructions.indices) {
instructions(instruction) match {
case est: EstimatorNode => Unit
case transformer: TransformerNode => Unit
case source: SourceNode => {
instructionIdToNodeId.put(instruction, nodes.length)
nodes.append(source)
dataDeps.append(Seq())
fitDeps.append(None)
}
case TransformerApplyNode(transformer, inputs) => {
instructions(transformer) match {
case transformerNode: TransformerNode => {
instructionIdToNodeId.put(instruction, nodes.length)
nodes.append(transformerNode)
dataDeps.append(inputs.map(instructionIdToNodeId.apply))
fitDeps.append(None)
}
case EstimatorFitNode(est, estInputs) => {
instructionIdToNodeId.put(instruction, nodes.length)
case EstimatorFitNode(est, estInputs) => (
nodes :+ new DelegatingTransformerNode(
s"Fit[${instructions(est).asInstanceOf[EstimatorNode].label}]"),
dataDeps :+ inputs.map(idMap.apply),
fitDeps :+ Some(idMap(transformer)),
idMap + (instruction -> nodes.length))

val label = s"Fit[${instructions(est).asInstanceOf[EstimatorNode].label}]"
nodes.append(new DelegatingTransformerNode(label))
dataDeps.append(inputs.map(instructionIdToNodeId.apply))
fitDeps.append(Some(instructionIdToNodeId(transformer)))
case _ => throw new RuntimeException("Transformer apply instruction must point at a Transformer")
}
case _ => throw new RuntimeException("Transformer apply instruction must point at a Transformer")
}
case EstimatorFitNode(est, inputs) => (
nodes :+ instructions(est).asInstanceOf[EstimatorNode],
dataDeps :+ inputs.map(idMap.apply),
fitDeps :+ None,
idMap + (instruction -> nodes.length))
}
case EstimatorFitNode(est, inputs) => {
val estimatorNode = instructions(est).asInstanceOf[EstimatorNode]
instructionIdToNodeId.put(instruction, nodes.length)
nodes.append(estimatorNode)
dataDeps.append(inputs.map(instructionIdToNodeId.apply))
fitDeps.append(None)
}
}
}

new ConcretePipeline(nodes.toSeq, dataDeps.toSeq, fitDeps.toSeq, nodes.length - 1)
new ConcretePipeline(newNodes, newDataDeps, newFitDeps, newNodes.length - 1)
}

/**
* Linearizes a pipeline DAG into a Seq[Instruction]
* by walking backwards from the sink in a depth-first manner
*/
def pipelineToInstructions[A, B](pipeline: Pipeline[A, B]): Seq[Instruction] = {
val nodes = pipeline.nodes
val dataDeps = pipeline.dataDeps
Expand Down

0 comments on commit d1b970a

Please sign in to comment.