Skip to content

Commit

Permalink
progress on adding documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Apr 27, 2016
1 parent 88d9d84 commit 220d5bc
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 24 deletions.
60 changes: 52 additions & 8 deletions src/main/scala/internal/AnalysisUtils.scala
@@ -1,13 +1,17 @@
package internal

object AnalysisUtils {
// util to get ancestors as tree?
// util to topologically sort ancestors?
// FIXME: Linearize won't be deterministic currently
def linearize(graph: Graph): Seq[GraphId] = {
def linearize(graphId: GraphId): Seq[GraphId] = {
val deps: Seq[GraphId] = graphId match {
case source: SourceId => Seq()
case node: NodeId => graph.getDependencies(node)
case sink: SinkId => Seq(graph.getSinkDependency(sink))
}

deps.foldLeft(Seq[GraphId]()) {
case (linearization, dep) => if (!linearization.contains(dep)) {
linearization ++ linearize(dep).filter(id => !linearization.contains(id))
Expand All @@ -24,8 +28,18 @@ object AnalysisUtils {
}
}

def getChildren(graph: Graph, node: GraphId): Set[GraphId] = {
node match {
/**
* Given a graph and a source/sink/node, output the set of all sources/sinks/nodes
* that directly depend on the source/sink/node.
*
* i.e. All ids with a direct dependency on this source/sink/node.
*
* @param graph The graph to use
* @param id The node/sink/source in the graph
* @return The set of all children of that id
*/
def getChildren(graph: Graph, id: GraphId): Set[GraphId] = {
id match {
case id: NodeOrSourceId => {
val childrenNodes = graph.dependencies.filter(_._2.contains(id)).keySet
val childrenSinks = graph.sinkDependencies.filter(_._2 == id).keySet
Expand All @@ -35,23 +49,53 @@ object AnalysisUtils {
}
}

def getDescendants(graph: Graph, node: GraphId): Set[GraphId] = {
val children = getChildren(graph, node)
/**
* Given a graph and a source/sink/node, output the set of all sources/sinks/nodes
* that have that source/sink/node as an ancestor.
*
* i.e. All ids that explicitly or implicitly depend on this source/sink/node.
*
* @param graph The graph to use
* @param id The node/sink/source in the graph
* @return The set of all descendents of that id
*/
def getDescendants(graph: Graph, id: GraphId): Set[GraphId] = {
val children = getChildren(graph, id)
children.map {
child => getDescendants(graph, child) + child
}.fold(Set())(_ union _)
}

def getParents(graph: Graph, node: GraphId): Set[NodeOrSourceId] = {
node match {
/**
* Given a graph and a source/sink/node, output the set of all nodes
* and sources that are parents of that source/sink/node in the graph.
*
* i.e. All ids that source/sink/node has as a dependency
*
* @param graph The graph to use
* @param id A node/sink/source in the graph
* @return The set of all parents of that id
*/
def getParents(graph: Graph, id: GraphId): Set[NodeOrSourceId] = {
id match {
case source: SourceId => Set()
case node: NodeId => graph.getDependencies(node).toSet
case sink: SinkId => Set(graph.getSinkDependency(sink))
}
}

def getAncestors(graph: Graph, node: GraphId): Set[NodeOrSourceId] = {
val parents = getParents(graph, node)
/**
* Given a graph and a source/sink/node, output the set of all nodes
* and sources that are ancestors of that source/sink/node in the graph.
*
* i.e. All ids that source/sink/node has explicit or implicit dependencies on.
*
* @param graph The graph to use
* @param id A node/sink/source in the graph
* @return The set of all ancestors of that id
*/
def getAncestors(graph: Graph, id: GraphId): Set[NodeOrSourceId] = {
val parents = getParents(graph, id)
parents.map {
parent => getAncestors(graph, parent) + parent
}.fold(Set())(_ union _)
Expand Down
168 changes: 152 additions & 16 deletions src/main/scala/internal/Graph.scala
@@ -1,16 +1,64 @@
package internal

class Graph(
val sources: Set[SourceId],
val sinkDependencies: Map[SinkId, NodeOrSourceId],
val operators: Map[NodeId, Operator],
val dependencies: Map[NodeId, Seq[NodeOrSourceId]]
/**
* To represent our Keystone workloads under the hood at the lowest level, we use a dataflow-esque DAG-like structure.
* Data flows through vertices that manipulate it. At this level, everything is fully untyped! All type information
* as it relates to ML-specific ideas is captured at higher API levels. More specifically, our workload graphs are
* made up of three types of vertices, Nodes, Sources, and Sinks, with edges between them.
*
* Nodes consist of a unique id, and an Operator. Each Node also has zero, one, or multiple ordered dependencies
* on Nodes and Sources elsewhere in the graph. These dependencies are represented as incoming edges in the DAG,
* with data flowing into the node. Dependency order matters, as the incoming data is passed directly to the
* Operator with each dependency as a separate parameter. Operators may take zero, one, or multiple ordered
* input parameters, and always produce a single output.
*
* Next, we have Sources, a special type of vertex. A source is an incomplete input connection coming in from the
* outside world. A workload cannot be executed without providing information about what data should be passed into
* each Source. Sources are also treated by utility methods as endpoints of the DAG, useful for specifying how to
* connect multiple workload DAGs together. Each Source has a unique id, and Sources always have zero
* dependencies/incoming edges.
*
* Finally, we have Sinks, another special type of vertex. Sinks each have a unique id, and exactly one dependency
* on either a source or a node. Sinks are output endpoints exposed to the outside world. They are used as markers
* during execution to specify what results of executing a workload to output. Like Sources, they are also referenced
* by utilities to to connect workloads together. Each Sink has a unique id, and exactly one dependency on either
* a Node or a Source.
*
* @param sources The set of all [[SourceId]]s of sources in the graph
* @param sinkDependencies A map of [[SinkId]] to the id of the node or source the sink depends on
* @param operators A map of [[NodeId]] to the operator contained within that node
* @param dependencies A map of [[NodeId]] to the node's ordered dependencies
*/
case class Graph(
sources: Set[SourceId],
sinkDependencies: Map[SinkId, NodeOrSourceId],
operators: Map[NodeId, Operator],
dependencies: Map[NodeId, Seq[NodeOrSourceId]]
) {

/**
* Get the set of ids of all nodes in this graph.
*/
def nodes: Set[NodeId] = operators.keySet

/**
* Get the set of ids of all sinks in this graph.
*/
def sinks: Set[SinkId] = sinkDependencies.keySet

/**
* Get the dependencies of a given node in this graph.
*/
def getDependencies(id: NodeId): Seq[NodeOrSourceId] = dependencies(id)

/**
* Get the dependency of a given sink in this graph.
*/
def getSinkDependency(id: SinkId): NodeOrSourceId = sinkDependencies(id)

/**
* Get the operator at a given node in this graph.
*/
def getOperator(id: NodeId): Operator = operators(id)

private def nextNodeIds(num: Int): Seq[NodeId] = {
Expand All @@ -32,6 +80,13 @@ class Graph(
private def nextSourceId(): SourceId = nextSourceIds(1).head
private def nextSinkId(): SinkId = nextSinkIds(1).head

/**
* Immutably add a node to this graph.
*
* @param op The operator to be stored at the new node
* @param deps The dependencies of the new node
* @return A pair containing the new graph and the id assigned to the new node
*/
def addNode(op: Operator, deps: Seq[NodeOrSourceId]): (Graph, NodeId) = {
require ({
val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources
Expand All @@ -41,9 +96,15 @@ class Graph(
val id = nextNodeId()
val newOperators = operators.updated(id, op)
val newDependencies = dependencies.updated(id, deps)
(new Graph(sources, sinkDependencies, newOperators, newDependencies), id)
(copy(operators = newOperators, dependencies = newDependencies), id)
}

/**
* Immutably add a new sink to this graph.
*
* @param dep The dependency of the new sink
* @return A pair containing the new graph and the id assigned to the new sink
*/
def addSink(dep: NodeOrSourceId): (Graph, SinkId) = {
require ({
val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources
Expand All @@ -52,15 +113,27 @@ class Graph(

val id = nextSinkId()
val newSinkDependencies = sinkDependencies.updated(id, dep)
(new Graph(sources, newSinkDependencies, operators, dependencies), id)
(copy(sinkDependencies = newSinkDependencies), id)
}

/**
* Immutably add a new source to this graph.
*
* @return A pair containing the new graph and the id assigned to the new source
*/
def addSource(): (Graph, SourceId) = {
val id = nextSourceId()
val newSources = sources + id
(new Graph(newSources, sinkDependencies, operators, dependencies), id)
(copy(sources = newSources), id)
}

/**
* Immutably update the dependencies of a node in this graph.
*
* @param node The id of the node to be updated
* @param deps The new dependencies to assign to the node
* @return The new graph
*/
def setDependencies(node: NodeId, deps: Seq[NodeOrSourceId]): Graph = {
require(dependencies.contains(node), "Node being updated must exist")
require ({
Expand All @@ -69,16 +142,30 @@ class Graph(
}, "Node must have dependencies on existing ids")

val newDependencies = dependencies.updated(node, deps)
new Graph(sources, sinkDependencies, operators, newDependencies)
copy(dependencies = newDependencies)
}

/**
* Immutably update the operator of a node in this graph.
*
* @param node The id of the node to be updated
* @param op The new operator to assign to the node
* @return The new graph
*/
def setOperator(node: NodeId, op: Operator): Graph = {
require(dependencies.contains(node), "Node being updated must exist")

val newOperators = operators.updated(node, op)
new Graph(sources, sinkDependencies, newOperators, dependencies)
copy(operators = newOperators)
}

/**
* Immutably update the dependency of a sink in this graph.
*
* @param sink The id of the sink to be updated
* @param dep The new dependency to assign to the sink
* @return The new graph
*/
def setSinkDependency(sink: SinkId, dep: NodeOrSourceId): Graph = {
require(sinkDependencies.contains(sink), "Sink being updated must exist")

Expand All @@ -88,31 +175,60 @@ class Graph(
}, "Sink must have dependencies on an existing id")

val newSinkDependencies = sinkDependencies.updated(sink, dep)
new Graph(sources, newSinkDependencies, operators, dependencies)
copy(sinkDependencies = newSinkDependencies)
}

/**
* Immutably remove a sink from this graph.
*
* @param sink The id of the sink to remove
* @return The new graph
*/
def removeSink(sink: SinkId): Graph = {
require(sinkDependencies.contains(sink), "Sink being removed must exist")

val newSinkDependencies = sinkDependencies - sink
new Graph(sources, newSinkDependencies, operators, dependencies)
copy(sinkDependencies = newSinkDependencies)
}

/**
* Immutably remove a source from this graph.
* Note: This may leave invalid dangling dependencies on the deleted source,
* that must be manually taken care of.
*
* @param source The id of the source to remove
* @return The new graph
*/
def removeSource(source: SourceId): Graph = {
require(sources.contains(source), "Source being removed must exist")

val newSources = sources - source
new Graph(newSources, sinkDependencies, operators, dependencies)
copy(sources = newSources)
}

/**
* Immutably remove a node from this graph.
* Note: This may leave invalid dangling dependencies on the deleted node,
* that must be manually taken care of.
*
* @param node The id of the node to remove
* @return The new graph
*/
def removeNode(node: NodeId): Graph = {
require(nodes.contains(node), "Node being removed must exist")

val newOperators = operators - node
val newDependencies = dependencies - node
new Graph(sources, sinkDependencies, newOperators, newDependencies)
copy(operators = newOperators, dependencies = newDependencies)
}

/**
* Immutably replace all dependencies on a given node or source with a new target.
*
* @param oldDep The id of the old node or source that is the existing dependency target
* @param newDep The id of the node or source intended to be the new target
* @return The new graph
*/
def replaceDependency(oldDep: NodeOrSourceId, newDep: NodeOrSourceId): Graph = {
require ({
val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources
Expand All @@ -129,9 +245,14 @@ class Graph(
(sinkAndDep._1, newSinkDep)
}

new Graph(sources, newSinkDependencies, operators, newDependencies)
copy(sinkDependencies = newSinkDependencies, dependencies = newDependencies)
}

/**
* TODO
* @param graph
* @return
*/
def addGraph(graph: Graph): (Graph, Map[SourceId, SourceId], Map[SinkId, SinkId]) = {
// Generate the new ids and mappings of old to new ids
val newSourceIds = nextSourceIds(graph.sources.size)
Expand Down Expand Up @@ -159,11 +280,18 @@ class Graph(
val newSinkDependencies = sinkDependencies ++ graph.sinkDependencies.map {
case (sinkId, dep) => (otherSinkIdMap(sinkId), otherNodeOrSourceIdMap(dep))
}
val newGraph = new Graph(newSources, newSinkDependencies, newOperators, newDependencies)
val newGraph = Graph(newSources, newSinkDependencies, newOperators, newDependencies)

(newGraph, otherSourceIdMap, otherSinkIdMap)
}

/**
* TODO
* @param graph
* @param spliceMap
* @return
*/
// fixme add output source and sink mappings
def connectGraph(graph: Graph, spliceMap: Map[SourceId, SinkId]): Graph = {
require(spliceMap.keys.forall(source => graph.sources.contains(source)),
"Must connect to sources that exist in the other graph")
Expand All @@ -185,6 +313,14 @@ class Graph(
}
}

/**
* TODO
* @param nodesToRemove
* @param replacement
* @param replacementSourceSplice
* @param replacementSinkSplice
* @return
*/
def replaceNodes(
nodesToRemove: Set[NodeId],
replacement: Graph,
Expand Down

0 comments on commit 220d5bc

Please sign in to comment.