From 220d5bcb9e1cff80f9957d682d9d5b84a6f255be Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Tue, 26 Apr 2016 17:10:26 -0700 Subject: [PATCH] progress on adding documentation --- src/main/scala/internal/AnalysisUtils.scala | 60 ++++++- src/main/scala/internal/Graph.scala | 168 ++++++++++++++++++-- 2 files changed, 204 insertions(+), 24 deletions(-) diff --git a/src/main/scala/internal/AnalysisUtils.scala b/src/main/scala/internal/AnalysisUtils.scala index 97da70f7..81873e0f 100644 --- a/src/main/scala/internal/AnalysisUtils.scala +++ b/src/main/scala/internal/AnalysisUtils.scala @@ -1,6 +1,9 @@ package internal object AnalysisUtils { + // util to get ancestors as tree? + // util to topologically sort ancestors? + // FIXME: Linearize won't be deterministic currently def linearize(graph: Graph): Seq[GraphId] = { def linearize(graphId: GraphId): Seq[GraphId] = { val deps: Seq[GraphId] = graphId match { @@ -8,6 +11,7 @@ object AnalysisUtils { case node: NodeId => graph.getDependencies(node) case sink: SinkId => Seq(graph.getSinkDependency(sink)) } + deps.foldLeft(Seq[GraphId]()) { case (linearization, dep) => if (!linearization.contains(dep)) { linearization ++ linearize(dep).filter(id => !linearization.contains(id)) @@ -24,8 +28,18 @@ object AnalysisUtils { } } - def getChildren(graph: Graph, node: GraphId): Set[GraphId] = { - node match { + /** + * Given a graph and a source/sink/node, output the set of all sources/sinks/nodes + * that directly depend on the source/sink/node. + * + * i.e. All ids with a direct dependency on this source/sink/node. + * + * @param graph The graph to use + * @param id The node/sink/source in the graph + * @return The set of all children of that id + */ + def getChildren(graph: Graph, id: GraphId): Set[GraphId] = { + id match { case id: NodeOrSourceId => { val childrenNodes = graph.dependencies.filter(_._2.contains(id)).keySet val childrenSinks = graph.sinkDependencies.filter(_._2 == id).keySet @@ -35,23 +49,53 @@ object AnalysisUtils { } } - def getDescendants(graph: Graph, node: GraphId): Set[GraphId] = { - val children = getChildren(graph, node) + /** + * Given a graph and a source/sink/node, output the set of all sources/sinks/nodes + * that have that source/sink/node as an ancestor. + * + * i.e. All ids that explicitly or implicitly depend on this source/sink/node. + * + * @param graph The graph to use + * @param id The node/sink/source in the graph + * @return The set of all descendents of that id + */ + def getDescendants(graph: Graph, id: GraphId): Set[GraphId] = { + val children = getChildren(graph, id) children.map { child => getDescendants(graph, child) + child }.fold(Set())(_ union _) } - def getParents(graph: Graph, node: GraphId): Set[NodeOrSourceId] = { - node match { + /** + * Given a graph and a source/sink/node, output the set of all nodes + * and sources that are parents of that source/sink/node in the graph. + * + * i.e. All ids that source/sink/node has as a dependency + * + * @param graph The graph to use + * @param id A node/sink/source in the graph + * @return The set of all parents of that id + */ + def getParents(graph: Graph, id: GraphId): Set[NodeOrSourceId] = { + id match { case source: SourceId => Set() case node: NodeId => graph.getDependencies(node).toSet case sink: SinkId => Set(graph.getSinkDependency(sink)) } } - def getAncestors(graph: Graph, node: GraphId): Set[NodeOrSourceId] = { - val parents = getParents(graph, node) + /** + * Given a graph and a source/sink/node, output the set of all nodes + * and sources that are ancestors of that source/sink/node in the graph. + * + * i.e. All ids that source/sink/node has explicit or implicit dependencies on. + * + * @param graph The graph to use + * @param id A node/sink/source in the graph + * @return The set of all ancestors of that id + */ + def getAncestors(graph: Graph, id: GraphId): Set[NodeOrSourceId] = { + val parents = getParents(graph, id) parents.map { parent => getAncestors(graph, parent) + parent }.fold(Set())(_ union _) diff --git a/src/main/scala/internal/Graph.scala b/src/main/scala/internal/Graph.scala index ecf81a5c..a5a27897 100644 --- a/src/main/scala/internal/Graph.scala +++ b/src/main/scala/internal/Graph.scala @@ -1,16 +1,64 @@ package internal -class Graph( - val sources: Set[SourceId], - val sinkDependencies: Map[SinkId, NodeOrSourceId], - val operators: Map[NodeId, Operator], - val dependencies: Map[NodeId, Seq[NodeOrSourceId]] +/** + * To represent our Keystone workloads under the hood at the lowest level, we use a dataflow-esque DAG-like structure. + * Data flows through vertices that manipulate it. At this level, everything is fully untyped! All type information + * as it relates to ML-specific ideas is captured at higher API levels. More specifically, our workload graphs are + * made up of three types of vertices, Nodes, Sources, and Sinks, with edges between them. + * + * Nodes consist of a unique id, and an Operator. Each Node also has zero, one, or multiple ordered dependencies + * on Nodes and Sources elsewhere in the graph. These dependencies are represented as incoming edges in the DAG, + * with data flowing into the node. Dependency order matters, as the incoming data is passed directly to the + * Operator with each dependency as a separate parameter. Operators may take zero, one, or multiple ordered + * input parameters, and always produce a single output. + * + * Next, we have Sources, a special type of vertex. A source is an incomplete input connection coming in from the + * outside world. A workload cannot be executed without providing information about what data should be passed into + * each Source. Sources are also treated by utility methods as endpoints of the DAG, useful for specifying how to + * connect multiple workload DAGs together. Each Source has a unique id, and Sources always have zero + * dependencies/incoming edges. + * + * Finally, we have Sinks, another special type of vertex. Sinks each have a unique id, and exactly one dependency + * on either a source or a node. Sinks are output endpoints exposed to the outside world. They are used as markers + * during execution to specify what results of executing a workload to output. Like Sources, they are also referenced + * by utilities to to connect workloads together. Each Sink has a unique id, and exactly one dependency on either + * a Node or a Source. + * + * @param sources The set of all [[SourceId]]s of sources in the graph + * @param sinkDependencies A map of [[SinkId]] to the id of the node or source the sink depends on + * @param operators A map of [[NodeId]] to the operator contained within that node + * @param dependencies A map of [[NodeId]] to the node's ordered dependencies + */ +case class Graph( + sources: Set[SourceId], + sinkDependencies: Map[SinkId, NodeOrSourceId], + operators: Map[NodeId, Operator], + dependencies: Map[NodeId, Seq[NodeOrSourceId]] ) { + + /** + * Get the set of ids of all nodes in this graph. + */ def nodes: Set[NodeId] = operators.keySet + + /** + * Get the set of ids of all sinks in this graph. + */ def sinks: Set[SinkId] = sinkDependencies.keySet + /** + * Get the dependencies of a given node in this graph. + */ def getDependencies(id: NodeId): Seq[NodeOrSourceId] = dependencies(id) + + /** + * Get the dependency of a given sink in this graph. + */ def getSinkDependency(id: SinkId): NodeOrSourceId = sinkDependencies(id) + + /** + * Get the operator at a given node in this graph. + */ def getOperator(id: NodeId): Operator = operators(id) private def nextNodeIds(num: Int): Seq[NodeId] = { @@ -32,6 +80,13 @@ class Graph( private def nextSourceId(): SourceId = nextSourceIds(1).head private def nextSinkId(): SinkId = nextSinkIds(1).head + /** + * Immutably add a node to this graph. + * + * @param op The operator to be stored at the new node + * @param deps The dependencies of the new node + * @return A pair containing the new graph and the id assigned to the new node + */ def addNode(op: Operator, deps: Seq[NodeOrSourceId]): (Graph, NodeId) = { require ({ val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources @@ -41,9 +96,15 @@ class Graph( val id = nextNodeId() val newOperators = operators.updated(id, op) val newDependencies = dependencies.updated(id, deps) - (new Graph(sources, sinkDependencies, newOperators, newDependencies), id) + (copy(operators = newOperators, dependencies = newDependencies), id) } + /** + * Immutably add a new sink to this graph. + * + * @param dep The dependency of the new sink + * @return A pair containing the new graph and the id assigned to the new sink + */ def addSink(dep: NodeOrSourceId): (Graph, SinkId) = { require ({ val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources @@ -52,15 +113,27 @@ class Graph( val id = nextSinkId() val newSinkDependencies = sinkDependencies.updated(id, dep) - (new Graph(sources, newSinkDependencies, operators, dependencies), id) + (copy(sinkDependencies = newSinkDependencies), id) } + /** + * Immutably add a new source to this graph. + * + * @return A pair containing the new graph and the id assigned to the new source + */ def addSource(): (Graph, SourceId) = { val id = nextSourceId() val newSources = sources + id - (new Graph(newSources, sinkDependencies, operators, dependencies), id) + (copy(sources = newSources), id) } + /** + * Immutably update the dependencies of a node in this graph. + * + * @param node The id of the node to be updated + * @param deps The new dependencies to assign to the node + * @return The new graph + */ def setDependencies(node: NodeId, deps: Seq[NodeOrSourceId]): Graph = { require(dependencies.contains(node), "Node being updated must exist") require ({ @@ -69,16 +142,30 @@ class Graph( }, "Node must have dependencies on existing ids") val newDependencies = dependencies.updated(node, deps) - new Graph(sources, sinkDependencies, operators, newDependencies) + copy(dependencies = newDependencies) } + /** + * Immutably update the operator of a node in this graph. + * + * @param node The id of the node to be updated + * @param op The new operator to assign to the node + * @return The new graph + */ def setOperator(node: NodeId, op: Operator): Graph = { require(dependencies.contains(node), "Node being updated must exist") val newOperators = operators.updated(node, op) - new Graph(sources, sinkDependencies, newOperators, dependencies) + copy(operators = newOperators) } + /** + * Immutably update the dependency of a sink in this graph. + * + * @param sink The id of the sink to be updated + * @param dep The new dependency to assign to the sink + * @return The new graph + */ def setSinkDependency(sink: SinkId, dep: NodeOrSourceId): Graph = { require(sinkDependencies.contains(sink), "Sink being updated must exist") @@ -88,31 +175,60 @@ class Graph( }, "Sink must have dependencies on an existing id") val newSinkDependencies = sinkDependencies.updated(sink, dep) - new Graph(sources, newSinkDependencies, operators, dependencies) + copy(sinkDependencies = newSinkDependencies) } + /** + * Immutably remove a sink from this graph. + * + * @param sink The id of the sink to remove + * @return The new graph + */ def removeSink(sink: SinkId): Graph = { require(sinkDependencies.contains(sink), "Sink being removed must exist") val newSinkDependencies = sinkDependencies - sink - new Graph(sources, newSinkDependencies, operators, dependencies) + copy(sinkDependencies = newSinkDependencies) } + /** + * Immutably remove a source from this graph. + * Note: This may leave invalid dangling dependencies on the deleted source, + * that must be manually taken care of. + * + * @param source The id of the source to remove + * @return The new graph + */ def removeSource(source: SourceId): Graph = { require(sources.contains(source), "Source being removed must exist") val newSources = sources - source - new Graph(newSources, sinkDependencies, operators, dependencies) + copy(sources = newSources) } + /** + * Immutably remove a node from this graph. + * Note: This may leave invalid dangling dependencies on the deleted node, + * that must be manually taken care of. + * + * @param node The id of the node to remove + * @return The new graph + */ def removeNode(node: NodeId): Graph = { require(nodes.contains(node), "Node being removed must exist") val newOperators = operators - node val newDependencies = dependencies - node - new Graph(sources, sinkDependencies, newOperators, newDependencies) + copy(operators = newOperators, dependencies = newDependencies) } + /** + * Immutably replace all dependencies on a given node or source with a new target. + * + * @param oldDep The id of the old node or source that is the existing dependency target + * @param newDep The id of the node or source intended to be the new target + * @return The new graph + */ def replaceDependency(oldDep: NodeOrSourceId, newDep: NodeOrSourceId): Graph = { require ({ val nodesAndSources: Set[NodeOrSourceId] = nodes ++ sources @@ -129,9 +245,14 @@ class Graph( (sinkAndDep._1, newSinkDep) } - new Graph(sources, newSinkDependencies, operators, newDependencies) + copy(sinkDependencies = newSinkDependencies, dependencies = newDependencies) } + /** + * TODO + * @param graph + * @return + */ def addGraph(graph: Graph): (Graph, Map[SourceId, SourceId], Map[SinkId, SinkId]) = { // Generate the new ids and mappings of old to new ids val newSourceIds = nextSourceIds(graph.sources.size) @@ -159,11 +280,18 @@ class Graph( val newSinkDependencies = sinkDependencies ++ graph.sinkDependencies.map { case (sinkId, dep) => (otherSinkIdMap(sinkId), otherNodeOrSourceIdMap(dep)) } - val newGraph = new Graph(newSources, newSinkDependencies, newOperators, newDependencies) + val newGraph = Graph(newSources, newSinkDependencies, newOperators, newDependencies) (newGraph, otherSourceIdMap, otherSinkIdMap) } + /** + * TODO + * @param graph + * @param spliceMap + * @return + */ + // fixme add output source and sink mappings def connectGraph(graph: Graph, spliceMap: Map[SourceId, SinkId]): Graph = { require(spliceMap.keys.forall(source => graph.sources.contains(source)), "Must connect to sources that exist in the other graph") @@ -185,6 +313,14 @@ class Graph( } } + /** + * TODO + * @param nodesToRemove + * @param replacement + * @param replacementSourceSplice + * @param replacementSinkSplice + * @return + */ def replaceNodes( nodesToRemove: Set[NodeId], replacement: Graph,