This repository has been archived by the owner on Apr 22, 2024. It is now read-only.
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
separated graph related stuff to a new module and class graph.Graph -…
… will be beneficial in the future for stream optimization magic and abstraction (like SQL-only parts and callable chaining,…)
- Loading branch information
Showing
2 changed files
with
221 additions
and
181 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
from collections import OrderedDict | ||
from brewery.utils import get_logger | ||
|
||
class Graph(object): | ||
"""Data processing stream""" | ||
def __init__(self, nodes=None, connections=None): | ||
"""Creates a node graph with connections. | ||
:Parameters: | ||
* `nodes` - dictionary with keys as node names and values as nodes | ||
* `connections` - list of two-item tuples. Each tuple contains source and target node | ||
or source and target node name. | ||
""" | ||
|
||
super(Graph, self).__init__() | ||
self.nodes = OrderedDict() | ||
self.connections = set() | ||
|
||
self.logger = get_logger() | ||
|
||
self._name_sequence = 1 | ||
|
||
if nodes: | ||
try: | ||
for name, node in nodes.items(): | ||
self.add(node, name) | ||
except: | ||
raise ValueError("Nodes should be a dictionary, is %s" % type(nodes)) | ||
|
||
if connections: | ||
for connection in connections: | ||
self.connect(connection[0], connection[1]) | ||
|
||
def _generate_node_name(self): | ||
"""Generates unique name for a node""" | ||
while 1: | ||
name = "node" + str(self._name_sequence) | ||
if name not in self.nodes.keys(): | ||
break | ||
self._name_sequence += 1 | ||
|
||
return name | ||
|
||
def add(self, node, name=None): | ||
"""Add a `node` into the stream. Does not allow to add named node if | ||
node with given name already exists. Generate node name if not | ||
provided. Node name is generated as ``node`` + sequence number. | ||
Uniqueness is tested.""" | ||
|
||
name = name or self._generate_node_name() | ||
|
||
if name in self.nodes: | ||
raise KeyError("Node with name %s already exists" % name) | ||
|
||
self.nodes[name] = node | ||
|
||
return name | ||
|
||
def node_name(self, node): | ||
"""Returns name of `node`.""" | ||
# There should not be more | ||
if not node: | ||
raise ValueError("No node provided") | ||
|
||
names = [key for key,value in self.nodes.items() if value==node] | ||
|
||
if len(names) == 1: | ||
return names[0] | ||
elif len(names) > 1: | ||
raise Exception("There are more references to the same node") | ||
else: # if len(names) == 0 | ||
raise Exception("Can not find node '%s'" % node) | ||
|
||
def node(self, name): | ||
"""Return node with name `name`.""" | ||
return self.nodes[name] | ||
|
||
def rename_node(self, node, name): | ||
"""Sets a name for `node`. Raises an exception if the `node` is not | ||
part of the stream, if `name` is empty or there is already node with | ||
the same name. """ | ||
|
||
if not name: | ||
raise ValueError("No node name provided for rename") | ||
if name in self.nodes(): | ||
raise ValueError("Node with name '%s' already exists" % name) | ||
|
||
old_name = node_name(node) | ||
|
||
del self.nodes[old_name] | ||
self.nodes[name] = node | ||
|
||
def coalesce_node(self, reference): | ||
"""Coalesce node reference: `reference` should be either a node name | ||
or a node. Returns the node object.""" | ||
|
||
if isinstance(reference, basestring): | ||
return self.nodes[reference] | ||
elif reference in self.nodes.values(): | ||
return reference | ||
else: | ||
raise ValueError("Unable to find node '%s'" % reference) | ||
|
||
def remove(self, node): | ||
"""Remove a `node` from the stream. Also all connections will be | ||
removed.""" | ||
|
||
# Allow node name, get the real node object | ||
if isinstance(node, basestring): | ||
name = node | ||
node = self.nodes[name] | ||
else: | ||
name = self.node_name(node) | ||
|
||
del self.nodes[name] | ||
|
||
to_be_removed = set() | ||
remove = [c for c in self.connections if c[0] == node or c[1] == node] | ||
|
||
for connection in remove: | ||
self.connections.remove(connection) | ||
|
||
def connect(self, source, target): | ||
"""Connects source node and target node. Nodes can be provided as | ||
objects or names.""" | ||
connection = (self.coalesce_node(source), self.coalesce_node(target)) | ||
self.connections.add(connection) | ||
|
||
def remove_connection(self, source, target): | ||
"""Remove connection between source and target nodes, if exists.""" | ||
|
||
connection = (self.coalesce_node(source), self.coalesce_node(target)) | ||
self.connections.discard(connection) | ||
|
||
def sorted_nodes(self): | ||
""" | ||
Return topologically sorted nodes. | ||
Algorithm:: | ||
L = Empty list that will contain the sorted elements | ||
S = Set of all nodes with no incoming edges | ||
while S is non-empty do | ||
remove a node n from S | ||
insert n into L | ||
for each node m with an edge e from n to m do | ||
remove edge e from the graph | ||
if m has no other incoming edges then | ||
insert m into S | ||
if graph has edges then | ||
raise exception: graph has at least one cycle | ||
else | ||
return proposed topologically sorted order: L | ||
""" | ||
def is_source(node, connections): | ||
for connection in connections: | ||
if node == connection[1]: | ||
return False | ||
return True | ||
|
||
def source_connections(node, connections): | ||
conns = set() | ||
for connection in connections: | ||
if node == connection[0]: | ||
conns.add(connection) | ||
return conns | ||
|
||
nodes = set(self.nodes.values()) | ||
connections = self.connections.copy() | ||
sorted_nodes = [] | ||
|
||
# Find source nodes: | ||
source_nodes = set([n for n in nodes if is_source(n, connections)]) | ||
|
||
# while S is non-empty do | ||
while source_nodes: | ||
# remove a node n from S | ||
node = source_nodes.pop() | ||
# insert n into L | ||
sorted_nodes.append(node) | ||
|
||
# for each node m with an edge e from n to m do | ||
s_connections = source_connections(node, connections) | ||
for connection in s_connections: | ||
# remove edge e from the graph | ||
m = connection[1] | ||
connections.remove(connection) | ||
# if m has no other incoming edges then | ||
# insert m into S | ||
if is_source(m, connections): | ||
source_nodes.add(m) | ||
|
||
# if graph has edges then | ||
# output error message (graph has at least one cycle) | ||
# else | ||
# output message (proposed topologically sorted order: L) | ||
|
||
if connections: | ||
raise Exception("Steram has at least one cycle (%d connections left of %d)" % (len(connections), len(self.connections))) | ||
|
||
return sorted_nodes | ||
|
||
def node_targets(self, node): | ||
"""Return nodes that `node` passes data into.""" | ||
node = self.coalesce_node(node) | ||
nodes =[conn[1] for conn in self.connections if conn[0] == node] | ||
return nodes | ||
|
||
def node_sources(self, node): | ||
"""Return nodes that provide data for `node`.""" | ||
node = self.coalesce_node(node) | ||
nodes =[conn[0] for conn in self.connections if conn[1] == node] | ||
return nodes |
Oops, something went wrong.