Skip to content

Commit

Permalink
progress on utils for a new internal graph format
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Apr 8, 2016
1 parent 0362165 commit 5efe012
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions src/main/scala/workflow/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,131 @@ private[workflow] case class EstimatorFitNode(est: Int, inputs: Seq[Int]) extend
}
}

sealed trait GraphId {
val id: Long
}

sealed trait NodeOrSourceID extends GraphId
case class NodeId(id: Long) extends NodeOrSourceID
case class SourceId(id: Long) extends NodeOrSourceID
case class SinkId(id: Long) extends GraphId

// Currently this only contains DAG manipulation utils
case class InstructionGraph(
instructions: Map[NodeId, (Node, Seq[NodeOrSourceID])],
sources: Seq[SourceId],
sinks: Seq[(SinkId, NodeOrSourceID)]
) {

// Analysis utils
def maxId: Long = {
val allIds = instructions.keys.map(_.id) ++ sources.map(_.id) ++ sinks.map(_._1.id)
allIds.max
}

// Other Analysis Utils:
// getParents
// getAncestors
// getChildren
// getDescendents



def addEdge(a: GraphId, b: NodeOrSourceID): InstructionGraph = addEdges(Seq((a, b)))
def addEdges(edges: Seq[(GraphId, NodeOrSourceID)]): InstructionGraph = {
// require that all edges connect to existing ids

// can connect sink -> anything else, but it removes that as a sink (but may take multiple edges w/ the same sink in this method call...)
// can connect anything -> source, but it removes that as a source (but may only take one edge that connects to that source in this method call...)
// Also means: can connect sink -> source


// To do this:
// First add all node -> node and node -> source connections
//
}

def addSinks(nodes: Seq[NodeOrSourceID]): (InstructionGraph, Seq[SinkId]) = {
require(nodes.forall {
case sourceId: SourceId => sources.contains(sourceId)
case nodeId: NodeId => instructions.contains(nodeId)
}, "All node ids being assigned to a sink must be nodes or sources in the graph.")

val newIdStart = maxId + 1
val newSinks = nodes.zipWithIndex.map {
case (nodeId, index) =>
(SinkId(newIdStart + index), nodeId)
}
val newGraph = this.copy(sinks = this.sinks ++ newSinks)

(newGraph, newSinks.map(_._1))
}

def addSink(node: NodeOrSourceID): (InstructionGraph, SinkId) = {
val (graph, newSinks) = addSinks(Seq(node))
(graph, newSinks.head)
}

def addSources(numSources: Int): (InstructionGraph, Seq[SourceId]) = {
val newIdStart = maxId + 1
val newSources = (0 until numSources).map(i => SourceId(newIdStart + i))
val newGraph = this.copy(sources = this.sources ++ newSources)
(newGraph, newSources)
}

def addSource(): (InstructionGraph, SourceId) = {
val (graph, newSources) = addSources(1)
(graph, newSources.head)
}

def addNodes(newNodes: Seq[Node]): (InstructionGraph, Seq[NodeId]) = {
val newIdStart = maxId + 1
val newNodesWithIds = newNodes.zipWithIndex.map {
case (node, index) =>
(NodeId(newIdStart + index), (node, Seq[NodeOrSourceID]()))
}

val newGraph = this.copy(instructions = this.instructions ++ newNodesWithIds)
(newGraph, newNodesWithIds.map(_._1))
}

def addNode(node: Node): (InstructionGraph, NodeId) = {
val (graph, newNodes) = addNodes(Seq(node))
(graph, newNodes.head)
}

// Need to add Utils to remove nodes, edges, sinks, sources
def removeEdges(edges: Set[(NodeOrSourceID, NodeId)]): InstructionGraph
def removeEdge(a: NodeOrSourceID, b: NodeId): InstructionGraph = removeEdges(Set((a, b)))

def removeSinks(sinksToRemove: Set[SinkId]): InstructionGraph = {
val newSinks = sinks.filter(sink => !sinksToRemove.contains(sink._1))
this.copy(sinks = newSinks)
}
def removeSink(sink: SinkId): InstructionGraph = removeSinks(Set(sink))

// Throw an error if there are still edges connected to the sourceIds
def removeSources(sourcesToRemove: Set[SourceId]): InstructionGraph
def removeSource(source: SourceId): InstructionGraph = removeSources(Set(source))

// when removing a node: turn all of its input deps into sinks, and anything that depends on it into a source
// when removing multiple nodes: remove all edges in between them, then do the above (all ingress into sinks, all egress into sources)
def removeNodes(nodes: Set[NodeId]): (InstructionGraph, Seq[SourceId], Seq[SinkId])
def removeNode(node: NodeId): (InstructionGraph, Seq[SourceId], Seq[SinkId]) = removeNodes(Set(node))

// Util to combine w/ another graph (potentially connecting/splicing some of the endpoints, but not necessarily)
// Do I leave sinks that were spliced to? (I definitely don't leave sources that were spliced to)
// For consistency, I won't leave sinks that were spliced to.
def combine(
otherGraph: InstructionGraph,
otherSourceToThisSink: Map[SourceId, SinkId],
thisSourceToOtherSink: Map[SourceId, SinkId]
): InstructionGraph

// Maybe also a util to re-assign all Node, Source, & Sink ids to not collide w/ a given set?
// - map each GraphId to a unique index
// - get a "non-collision" index
}
sealed trait Node {
def label: String = {
val className = getClass.getSimpleName
Expand Down

0 comments on commit 5efe012

Please sign in to comment.