diff --git a/brewery/graph.py b/brewery/graph.py new file mode 100644 index 0000000..d8d5333 --- /dev/null +++ b/brewery/graph.py @@ -0,0 +1,213 @@ +from collections import OrderedDict +from brewery.utils import get_logger + +class Graph(object): + """Data processing stream""" + def __init__(self, nodes=None, connections=None): + """Creates a node graph with connections. + + :Parameters: + * `nodes` - dictionary with keys as node names and values as nodes + * `connections` - list of two-item tuples. Each tuple contains source and target node + or source and target node name. + """ + + super(Graph, self).__init__() + self.nodes = OrderedDict() + self.connections = set() + + self.logger = get_logger() + + self._name_sequence = 1 + + if nodes: + try: + for name, node in nodes.items(): + self.add(node, name) + except: + raise ValueError("Nodes should be a dictionary, is %s" % type(nodes)) + + if connections: + for connection in connections: + self.connect(connection[0], connection[1]) + + def _generate_node_name(self): + """Generates unique name for a node""" + while 1: + name = "node" + str(self._name_sequence) + if name not in self.nodes.keys(): + break + self._name_sequence += 1 + + return name + + def add(self, node, name=None): + """Add a `node` into the stream. Does not allow to add named node if + node with given name already exists. Generate node name if not + provided. Node name is generated as ``node`` + sequence number. + Uniqueness is tested.""" + + name = name or self._generate_node_name() + + if name in self.nodes: + raise KeyError("Node with name %s already exists" % name) + + self.nodes[name] = node + + return name + + def node_name(self, node): + """Returns name of `node`.""" + # There should not be more + if not node: + raise ValueError("No node provided") + + names = [key for key,value in self.nodes.items() if value==node] + + if len(names) == 1: + return names[0] + elif len(names) > 1: + raise Exception("There are more references to the same node") + else: # if len(names) == 0 + raise Exception("Can not find node '%s'" % node) + + def node(self, name): + """Return node with name `name`.""" + return self.nodes[name] + + def rename_node(self, node, name): + """Sets a name for `node`. Raises an exception if the `node` is not + part of the stream, if `name` is empty or there is already node with + the same name. """ + + if not name: + raise ValueError("No node name provided for rename") + if name in self.nodes(): + raise ValueError("Node with name '%s' already exists" % name) + + old_name = node_name(node) + + del self.nodes[old_name] + self.nodes[name] = node + + def coalesce_node(self, reference): + """Coalesce node reference: `reference` should be either a node name + or a node. Returns the node object.""" + + if isinstance(reference, basestring): + return self.nodes[reference] + elif reference in self.nodes.values(): + return reference + else: + raise ValueError("Unable to find node '%s'" % reference) + + def remove(self, node): + """Remove a `node` from the stream. Also all connections will be + removed.""" + + # Allow node name, get the real node object + if isinstance(node, basestring): + name = node + node = self.nodes[name] + else: + name = self.node_name(node) + + del self.nodes[name] + + to_be_removed = set() + remove = [c for c in self.connections if c[0] == node or c[1] == node] + + for connection in remove: + self.connections.remove(connection) + + def connect(self, source, target): + """Connects source node and target node. Nodes can be provided as + objects or names.""" + connection = (self.coalesce_node(source), self.coalesce_node(target)) + self.connections.add(connection) + + def remove_connection(self, source, target): + """Remove connection between source and target nodes, if exists.""" + + connection = (self.coalesce_node(source), self.coalesce_node(target)) + self.connections.discard(connection) + + def sorted_nodes(self): + """ + Return topologically sorted nodes. + + Algorithm:: + + L = Empty list that will contain the sorted elements + S = Set of all nodes with no incoming edges + while S is non-empty do + remove a node n from S + insert n into L + for each node m with an edge e from n to m do + remove edge e from the graph + if m has no other incoming edges then + insert m into S + if graph has edges then + raise exception: graph has at least one cycle + else + return proposed topologically sorted order: L + """ + def is_source(node, connections): + for connection in connections: + if node == connection[1]: + return False + return True + + def source_connections(node, connections): + conns = set() + for connection in connections: + if node == connection[0]: + conns.add(connection) + return conns + + nodes = set(self.nodes.values()) + connections = self.connections.copy() + sorted_nodes = [] + + # Find source nodes: + source_nodes = set([n for n in nodes if is_source(n, connections)]) + + # while S is non-empty do + while source_nodes: + # remove a node n from S + node = source_nodes.pop() + # insert n into L + sorted_nodes.append(node) + + # for each node m with an edge e from n to m do + s_connections = source_connections(node, connections) + for connection in s_connections: + # remove edge e from the graph + m = connection[1] + connections.remove(connection) + # if m has no other incoming edges then + # insert m into S + if is_source(m, connections): + source_nodes.add(m) + + # if graph has edges then + # output error message (graph has at least one cycle) + # else + # output message (proposed topologically sorted order: L) + + if connections: + raise Exception("Steram has at least one cycle (%d connections left of %d)" % (len(connections), len(self.connections))) + + return sorted_nodes + + def node_targets(self, node): + """Return nodes that `node` passes data into.""" + node = self.coalesce_node(node) + nodes =[conn[1] for conn in self.connections if conn[0] == node] + return nodes + + def node_sources(self, node): + """Return nodes that provide data for `node`.""" + node = self.coalesce_node(node) + nodes =[conn[0] for conn in self.connections if conn[1] == node] + return nodes diff --git a/brewery/streams.py b/brewery/streams.py index ba17f8e..8afc2fa 100755 --- a/brewery/streams.py +++ b/brewery/streams.py @@ -7,6 +7,7 @@ from brewery.utils import get_logger from brewery.nodes import * from brewery.common import * +from .graph import * __all__ = [ "Stream", @@ -216,7 +217,7 @@ def done_receiving(self): self._note("C not_empty rel! r") -class Stream(object): +class Stream(Graph): """Data processing stream""" def __init__(self, nodes=None, connections=None): """Creates a data stream. @@ -227,173 +228,12 @@ def __init__(self, nodes=None, connections=None): or source and target node name. * `stream` - another stream or """ - super(Stream, self).__init__() - self.nodes = [] - self.node_dict = {} - self.connections = set() - + super(Stream, self).__init__(nodes, connections) self.logger = get_logger() - if nodes: - try: - for name, node in nodes.items(): - self.add(node, name) - except: - raise StreamError("Nodes should be a dictionary, is %s" % type(nodes)) - - if connections: - for connection in connections: - self.connect(connection[0], connection[1]) - self.exceptions = [] - def node(self, node): - """Returns a node in the stream or node with name. This method is used for coalescing in - other methods, where you can pass either node name or node object. - - Raises KeyError when node does not exist. - - :Parameters: - * `node` - node object or node name - """ - - if type(node) == str or type(node) == unicode: - return self.node_dict[node] - else: - return node - - def add(self, node, name=None): - """Add a `node` into the stream. Does not allow to add named node if node with given name - already exists. """ - if name: - if name in self.node_dict: - raise KeyError("Node with name %s already exists" % name) - self.node_dict[name] = node - - if node not in self.nodes: - self.nodes.append(node) - - def set_node_name(self, node, name): - """Sets a name for `node`. If `name` is ``None`` then node name will be removed. - - Raises an exception if the `node` is not part of the stream or there is already node with - same name. - """ - if node not in self.nodes: - raise KeyError("Node %s does not belong to stream" % node) - - if name: - if name in self.node_dict: - raise KeyError("Node with name %s already exists" % name) - - self.node_dict[name] = node - else: - del self.node_dict[name] - - def remove(self, node): - """Remove a `node` from the stream. Also all connections will be removed.""" - - node = self.node(node) - - self.nodes.remove(node) - for (name, current_node) in self.node_dict.items(): - if current_node == node: - del self.node_dict[name] - - to_be_removed = set() - for connection in self.connections: - if connection[0] == node or connection[1] == node: - to_be_removed.add(connection) - - for connection in to_be_removed: - self.connections.remove(connection) - - def connect(self, source, target): - """Connects source node and target node. Nodes can be provided as objects or names.""" - - source_node = self.node(source) - target_node = self.node(target) - self.connections.add((source_node, target_node)) - - def remove_connection(self, source, target): - """Remove connection between source and target nodes, if exists.""" - source_node = self.node(source) - target_node = self.node(target) - - self.connections.discard((source_node, target_node)) - - def sorted_nodes(self): - """ - Return topologically sorted nodes. - - Algorithm:: - - L = Empty list that will contain the sorted elements - S = Set of all nodes with no incoming edges - while S is non-empty do - remove a node n from S - insert n into L - for each node m with an edge e from n to m do - remove edge e from the graph - if m has no other incoming edges then - insert m into S - if graph has edges then - raise exception: graph has at least one cycle - else - return proposed topologically sorted order: L - """ - def is_source(node, connections): - for connection in connections: - if node == connection[1]: - return False - return True - - def source_connections(node, connections): - conns = set() - for connection in connections: - if node == connection[0]: - conns.add(connection) - return conns - - nodes = set(self.nodes) - connections = self.connections.copy() - sorted_nodes = [] - source_nodes = set() - - # Find source nodes: - for node in nodes: - if is_source(node, connections): - source_nodes.add(node) - - # while S is non-empty do - while source_nodes: - # remove a node n from S - node = source_nodes.pop() - # insert n into L - sorted_nodes.append(node) - - # for each node m with an edge e from n to m do - s_connections = source_connections(node, connections) - for connection in s_connections: - # remove edge e from the graph - m = connection[1] - connections.remove(connection) - # if m has no other incoming edges then - # insert m into S - if is_source(m, connections): - source_nodes.add(m) - - # if graph has edges then - # output error message (graph has at least one cycle) - # else - # output message (proposed topologically sorted order: L) - - if connections: - raise Exception("Steram has at least one cycle") - - return sorted_nodes - - def fork(self, node=None): + def fork(self): """Creates a construction fork of the stream. Used for constructing streams in functional fashion. Example:: @@ -426,8 +266,7 @@ def fork(self, node=None): To fork a fork, just call ``fork()`` """ - return _StreamFork(self, self.node(node)) - + return _StreamFork(self) def update(self, nodes = None, connections = None): """Adds nodes and connections specified in the dictionary. Dictionary might contain @@ -511,21 +350,9 @@ def configure(self, config = {}): # Configure nodes for (node_name, config) in configurations.items(): - node = self.node(node_name) + node = self.coalesce_node(node_name) node.configure(config) - def node_targets(self, node): - """Return nodes that `node` passes data into.""" - node = self.node(node) - nodes =[conn[1] for conn in self.connections if conn[0] == node] - return nodes - - def node_sources(self, node): - """Return nodes that provide data for `node`.""" - node = self.node(node) - nodes =[conn[0] for conn in self.connections if conn[1] == node] - return nodes - def _initialize(self): """Initializes the data processing stream: @@ -593,8 +420,6 @@ def run(self): self._finalize() def _run(self): - - self.logger.info("running stream") threads = [] @@ -678,6 +503,7 @@ def _finalize(self): for node in self.sorted_nodes(): self.logger.debug("finalizing node %s" % node_label(node)) node.finalize() + def node_label(node): """Debug label for a node: node identifier with python object id.""" return "%s(%s)" % (node.identifier() or str(type(node)), id(node)) @@ -799,6 +625,7 @@ def __init__(self, fork, node_class): def __call__(self, *args, **kwargs): node = self.node_class(*args, **kwargs) + print "CALLING %s - %s" % (self.node_class, node) self.fork += node return self.fork