Skip to content

Commit

Permalink
Merge pull request #141 from fabric-testbed/1.3-race-fix
Browse files Browse the repository at this point in the history
1.3 race fix
  • Loading branch information
ibaldin committed Oct 7, 2022
2 parents ce2631f + 8956ff4 commit 9588456
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 88 deletions.
2 changes: 1 addition & 1 deletion fim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#
__VERSION__ = "1.3.2"
__VERSION__ = "1.3.3"

123 changes: 79 additions & 44 deletions fim/graph/networkx_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import tempfile
import networkx as nx
import networkx_query as nxq
from threading import Lock

from .abc_property_graph import ABCPropertyGraph, PropertyGraphImportException, \
PropertyGraphQueryException, ABCGraphImporter, GraphFormat
Expand Down Expand Up @@ -309,14 +310,14 @@ def update_link_properties(self, *, node_a: str, node_b: str, kind: str, props:
def get_all_nodes_by_class(self, *, label: str) -> List[str]:
assert label is not None
my_graph = self.storage.get_graph(self.graph_id)
ret = list()
graph_nodes = list(nxq.search_nodes(my_graph,
{'and': [
{'eq': [ABCPropertyGraph.GRAPH_ID, self.graph_id]},
{'eq': [ABCPropertyGraph.PROP_CLASS,
label]}
]
}))
ret = list()
for n in graph_nodes:
ret.append(my_graph.nodes[n][ABCPropertyGraph.NODE_ID])
return ret
Expand Down Expand Up @@ -567,6 +568,7 @@ def add_link(self, *, node_a: str, rel: str, node_b: str, props: Dict[str, Any]

real_node_a = self._find_node(node_id=node_a, graph_id=self.graph_id)
real_node_b = self._find_node(node_id=node_b, graph_id=self.graph_id)

if props is not None:
self.storage.get_graph(self.graph_id).add_edge(real_node_a, real_node_b, Class=rel, **props)
else:
Expand Down Expand Up @@ -685,70 +687,103 @@ def __init__(self, logger=None):
self.graphs = nx.Graph()
self.start_id = 1
self.log = logger
self.lock = Lock()

def add_graph(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete it so we can replace
self.del_graph(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
self.lock.acquire()
try:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete it so we can replace
self.__del_graph_nl(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
except Exception as e:
raise e
finally:
self.lock.release()

def add_graph_direct(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete so we can replace
self.del_graph(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
try:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete so we can replace
self.__del_graph_nl(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
except Exception as e:
raise e
finally:
self.lock.release()

def __del_graph_nl(self, graph_id: str) -> None:
# non-locking version
# find all nodes of this graph and remove them
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if graph_nodes is not None and len(graph_nodes) > 0:
self.graphs.remove_nodes_from(graph_nodes)

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
self.__del_graph_nl(graph_id)
self.lock.release()

def extract_graph(self, graph_id: str) -> nx.Graph or None:
# extract copy of graph from store or return None
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(graph_nodes) == 0:
return None
# get adjacency (only gets edges and their properties)
edge_dict = nx.to_dict_of_dicts(self.graphs, graph_nodes)
# create new graph from edges
ret = nx.from_dict_of_dicts(edge_dict)
for n in graph_nodes:
# merge node dictionaries
ret.nodes[n].update(self.graphs.nodes[n])
return ret
self.lock.acquire()
try:
# extract copy of graph from store or return None
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(graph_nodes) == 0:
return None
# get adjacency (only gets edges and their properties)
edge_dict = nx.to_dict_of_dicts(self.graphs, graph_nodes)
# create new graph from edges
ret = nx.from_dict_of_dicts(edge_dict)
for n in graph_nodes:
# merge node dictionaries
ret.nodes[n].update(self.graphs.nodes[n])
return ret
except Exception as e:
raise e
finally:
self.lock.release()

def get_graph(self, graph_id) -> nx.Graph:
# return the store for all graphs (graph_id is ignored)
return self.graphs

def del_all_graphs(self) -> None:
self.lock.acquire()
self.graphs.clear()
self.lock.release()

def add_blank_node_to_graph(self, graph_id, **attrs) -> int:
# add a new node into a graph, return internal
# int id of the added node
self.graphs.add_node(self.start_id, GraphID=graph_id, **attrs)
self.start_id = self.start_id + 1
return self.start_id - 1
self.lock.acquire()
try:
self.graphs.add_node(self.start_id, GraphID=graph_id, **attrs)
self.start_id = self.start_id + 1
return self.start_id - 1
except Exception as e:
raise e
finally:
self.lock.release()

storage_instance = None

Expand Down
124 changes: 84 additions & 40 deletions fim/graph/networkx_property_graph_disjoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@
NetworkX-specific implementation of property graph abstraction
"""
import logging
import threading

import networkx as nx
import networkx_query as nxq
from collections import defaultdict
from threading import Lock

from .abc_property_graph import ABCPropertyGraph, PropertyGraphImportException, PropertyGraphQueryException
from .networkx_property_graph import NetworkXPropertyGraph, NetworkXGraphImporter


def constant_factory(val):
return lambda: val


class NetworkXPropertyGraphDisjoint(NetworkXPropertyGraph):
"""
This class implements most of ABCPropertyGraph functionality.
Expand Down Expand Up @@ -61,7 +70,11 @@ def graph_exists(self) -> bool:
Does the graph with this ID exist?
:return:
"""
return self.storage.graphs.get(self.graph_id, None) is not None
graph_nodes = list(nxq.search_nodes(self.storage.get_graph(self.graph_id),
{'eq': [ABCPropertyGraph.GRAPH_ID, self.graph_id]}))
if graph_nodes is not None and len(graph_nodes) > 0:
return True
return False


class NetworkXGraphStorageDisjoint:
Expand All @@ -76,66 +89,97 @@ class __NetworkXGraphStorage:
"""

def __init__(self, logger=None):
self.graphs = dict()
self.graphs = defaultdict(nx.Graph)
# start ids with 1 in each graph
self.graph_node_ids = defaultdict(constant_factory(1))
self.log = logger
self.lock = threading.Lock()

def add_graph(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
return
# relabel incoming graph nodes to integers, then add
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
# this is needed in case passed in graph is actually a DiGraph
# which graphs produced by yEd tend to be
self.graphs[graph_id] = nx.Graph()
self.graphs[graph_id].add_nodes_from(temp_graph.nodes(data=True))
self.graphs[graph_id].add_edges_from(temp_graph.edges(data=True))
self.lock.acquire()
try:
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
self.lock.release()
return
# relabel incoming graph nodes to integers, then add
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
# this is needed in case passed in graph is actually a DiGraph
# which graphs produced by yEd tend to be
self.graphs[graph_id] = nx.Graph()
self.graphs[graph_id].add_nodes_from(temp_graph.nodes(data=True))
self.graphs[graph_id].add_edges_from(temp_graph.edges(data=True))
self.graph_node_ids[graph_id] = len(self.graphs[graph_id].nodes()) + 1
except Exception as e:
raise e
finally:
self.lock.release()

def add_graph_direct(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
return
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
self.graphs[graph_id] = temp_graph

self.lock.acquire()
try:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
self.graphs[graph_id].clear()
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
self.graphs[graph_id] = temp_graph
self.graph_node_ids[graph_id] = len(self.graphs[graph_id].nodes()) + 1
except Exception as e:
raise e
finally:
self.lock.release()

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
try:
self.graphs.pop(graph_id)
except KeyError:
pass
if len(self.graphs[graph_id].nodes) > 0:
self.graphs[graph_id].clear()
except Exception as e:
raise e
finally:
self.lock.release()

def extract_graph(self, graph_id: str) -> nx.Graph or None:
graph = self.graphs.get(graph_id, None)
if graph is not None:
return graph.copy()
else:
return None
self.lock.acquire()
graph = self.graphs[graph_id]
self.lock.release()
return graph.copy()

def get_graph(self, graph_id) -> nx.Graph:
# return the store for this graph
return self.graphs.get(graph_id, None)
self.lock.acquire()
ret = self.graphs[graph_id]
self.lock.release()
return ret

def del_all_graphs(self) -> None:
self.lock.acquire()
self.graphs.clear()
self.lock.release()

def add_blank_node_to_graph(self, graph_id, **attrs) -> int:
# add a new node into a graph, return internal
# int id of the added node
new_id = 1 + len(self.graphs[graph_id].nodes())
self.graphs[graph_id].add_node(new_id, GraphID=graph_id, **attrs)
self.lock.acquire()
try:
new_id = self.graph_node_ids[graph_id]
self.graph_node_ids[graph_id] += 1
self.graphs[graph_id].add_node(new_id, GraphID=graph_id, **attrs)
except Exception as e:
raise e
finally:
self.lock.release()
return new_id

storage_instance = None
Expand Down
7 changes: 5 additions & 2 deletions fim/user/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from ..graph.slices.networkx_asm import NetworkxASM
from ..graph.slices.neo4j_asm import Neo4jASM
from ..graph.abc_property_graph import ABCPropertyGraph, GraphFormat
from ..graph.resources.networkx_arm import NetworkXARMGraph, NetworkXGraphImporter
from ..graph.resources.networkx_arm import NetworkXARMGraph
from ..graph.networkx_property_graph import NetworkXGraphImporter
from ..graph.networkx_property_graph_disjoint import NetworkXGraphImporterDisjoint
from ..graph.resources.neo4j_arm import Neo4jARMGraph
from ..slivers.delegations import Delegation, Delegations, Pools, DelegationType, DelegationFormat
from fim.graph.resources.networkx_abqm import NetworkXAggregateBQM, NetworkXABQMFactory
Expand Down Expand Up @@ -81,7 +83,8 @@ def __init__(self, graph_file: str = None, graph_string: str = None, logger=None
importer=NetworkXGraphImporter(logger=logger),
logger=logger)
else:
if isinstance(importer, NetworkXGraphImporter):
if isinstance(importer, NetworkXGraphImporter) or \
isinstance(importer, NetworkXGraphImporterDisjoint):
self.graph_model = NetworkxASM(graph_id=str(uuid.uuid4()),
importer=importer,
logger=logger)
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ rfc3986==1.4.0
six==1.15.0
toml==0.10.2
tqdm==4.54.1
typed-ast==1.4.1
typing-extensions==3.7.4.3
webencodings==0.5.1
wrapt==1.12.1

0 comments on commit 9588456

Please sign in to comment.