Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.3 race fix #141

Merged
merged 3 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#
__VERSION__ = "1.3.2"
__VERSION__ = "1.3.3"

123 changes: 79 additions & 44 deletions fim/graph/networkx_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import tempfile
import networkx as nx
import networkx_query as nxq
from threading import Lock

from .abc_property_graph import ABCPropertyGraph, PropertyGraphImportException, \
PropertyGraphQueryException, ABCGraphImporter, GraphFormat
Expand Down Expand Up @@ -309,14 +310,14 @@ def update_link_properties(self, *, node_a: str, node_b: str, kind: str, props:
def get_all_nodes_by_class(self, *, label: str) -> List[str]:
assert label is not None
my_graph = self.storage.get_graph(self.graph_id)
ret = list()
graph_nodes = list(nxq.search_nodes(my_graph,
{'and': [
{'eq': [ABCPropertyGraph.GRAPH_ID, self.graph_id]},
{'eq': [ABCPropertyGraph.PROP_CLASS,
label]}
]
}))
ret = list()
for n in graph_nodes:
ret.append(my_graph.nodes[n][ABCPropertyGraph.NODE_ID])
return ret
Expand Down Expand Up @@ -567,6 +568,7 @@ def add_link(self, *, node_a: str, rel: str, node_b: str, props: Dict[str, Any]

real_node_a = self._find_node(node_id=node_a, graph_id=self.graph_id)
real_node_b = self._find_node(node_id=node_b, graph_id=self.graph_id)

if props is not None:
self.storage.get_graph(self.graph_id).add_edge(real_node_a, real_node_b, Class=rel, **props)
else:
Expand Down Expand Up @@ -685,70 +687,103 @@ def __init__(self, logger=None):
self.graphs = nx.Graph()
self.start_id = 1
self.log = logger
self.lock = Lock()

def add_graph(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete it so we can replace
self.del_graph(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
self.lock.acquire()
try:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete it so we can replace
self.__del_graph_nl(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
except Exception as e:
raise e
finally:
self.lock.release()

def add_graph_direct(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete so we can replace
self.del_graph(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
try:
# check this graph_id isn't already present
existing_graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(existing_graph_nodes) > 0:
# graph already present, delete so we can replace
self.__del_graph_nl(graph_id)
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, first_label=self.start_id)
self.start_id = self.start_id + len(temp_graph.nodes())
self.graphs.add_nodes_from(temp_graph.nodes(data=True))
self.graphs.add_edges_from(temp_graph.edges(data=True))
except Exception as e:
raise e
finally:
self.lock.release()

def __del_graph_nl(self, graph_id: str) -> None:
# non-locking version
# find all nodes of this graph and remove them
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if graph_nodes is not None and len(graph_nodes) > 0:
self.graphs.remove_nodes_from(graph_nodes)

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
self.__del_graph_nl(graph_id)
self.lock.release()

def extract_graph(self, graph_id: str) -> nx.Graph or None:
# extract copy of graph from store or return None
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(graph_nodes) == 0:
return None
# get adjacency (only gets edges and their properties)
edge_dict = nx.to_dict_of_dicts(self.graphs, graph_nodes)
# create new graph from edges
ret = nx.from_dict_of_dicts(edge_dict)
for n in graph_nodes:
# merge node dictionaries
ret.nodes[n].update(self.graphs.nodes[n])
return ret
self.lock.acquire()
try:
# extract copy of graph from store or return None
graph_nodes = list(nxq.search_nodes(self.graphs, {'eq': [ABCPropertyGraph.GRAPH_ID, graph_id]}))
if len(graph_nodes) == 0:
return None
# get adjacency (only gets edges and their properties)
edge_dict = nx.to_dict_of_dicts(self.graphs, graph_nodes)
# create new graph from edges
ret = nx.from_dict_of_dicts(edge_dict)
for n in graph_nodes:
# merge node dictionaries
ret.nodes[n].update(self.graphs.nodes[n])
return ret
except Exception as e:
raise e
finally:
self.lock.release()

def get_graph(self, graph_id) -> nx.Graph:
# return the store for all graphs (graph_id is ignored)
return self.graphs

def del_all_graphs(self) -> None:
self.lock.acquire()
self.graphs.clear()
self.lock.release()

def add_blank_node_to_graph(self, graph_id, **attrs) -> int:
# add a new node into a graph, return internal
# int id of the added node
self.graphs.add_node(self.start_id, GraphID=graph_id, **attrs)
self.start_id = self.start_id + 1
return self.start_id - 1
self.lock.acquire()
try:
self.graphs.add_node(self.start_id, GraphID=graph_id, **attrs)
self.start_id = self.start_id + 1
return self.start_id - 1
except Exception as e:
raise e
finally:
self.lock.release()

storage_instance = None

Expand Down
124 changes: 84 additions & 40 deletions fim/graph/networkx_property_graph_disjoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@
NetworkX-specific implementation of property graph abstraction
"""
import logging
import threading

import networkx as nx
import networkx_query as nxq
from collections import defaultdict
from threading import Lock

from .abc_property_graph import ABCPropertyGraph, PropertyGraphImportException, PropertyGraphQueryException
from .networkx_property_graph import NetworkXPropertyGraph, NetworkXGraphImporter


def constant_factory(val):
return lambda: val


class NetworkXPropertyGraphDisjoint(NetworkXPropertyGraph):
"""
This class implements most of ABCPropertyGraph functionality.
Expand Down Expand Up @@ -61,7 +70,11 @@ def graph_exists(self) -> bool:
Does the graph with this ID exist?
:return:
"""
return self.storage.graphs.get(self.graph_id, None) is not None
graph_nodes = list(nxq.search_nodes(self.storage.get_graph(self.graph_id),
{'eq': [ABCPropertyGraph.GRAPH_ID, self.graph_id]}))
if graph_nodes is not None and len(graph_nodes) > 0:
return True
return False


class NetworkXGraphStorageDisjoint:
Expand All @@ -76,66 +89,97 @@ class __NetworkXGraphStorage:
"""

def __init__(self, logger=None):
self.graphs = dict()
self.graphs = defaultdict(nx.Graph)
# start ids with 1 in each graph
self.graph_node_ids = defaultdict(constant_factory(1))
self.log = logger
self.lock = threading.Lock()

def add_graph(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
return
# relabel incoming graph nodes to integers, then add
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
# this is needed in case passed in graph is actually a DiGraph
# which graphs produced by yEd tend to be
self.graphs[graph_id] = nx.Graph()
self.graphs[graph_id].add_nodes_from(temp_graph.nodes(data=True))
self.graphs[graph_id].add_edges_from(temp_graph.edges(data=True))
self.lock.acquire()
try:
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
self.lock.release()
return
# relabel incoming graph nodes to integers, then add
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
# set/overwrite GraphID property on all nodes
for n in list(temp_graph.nodes()):
if not temp_graph.nodes[n].get(ABCPropertyGraph.NODE_ID, None):
raise PropertyGraphImportException(graph_id=graph_id,
msg="Some nodes are missing NodeID property, unable to import")
temp_graph.nodes[n][ABCPropertyGraph.GRAPH_ID] = graph_id
# this is needed in case passed in graph is actually a DiGraph
# which graphs produced by yEd tend to be
self.graphs[graph_id] = nx.Graph()
self.graphs[graph_id].add_nodes_from(temp_graph.nodes(data=True))
self.graphs[graph_id].add_edges_from(temp_graph.edges(data=True))
self.graph_node_ids[graph_id] = len(self.graphs[graph_id].nodes()) + 1
except Exception as e:
raise e
finally:
self.lock.release()

def add_graph_direct(self, graph_id: str, graph: nx.Graph) -> None:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
# graph already present, warn and exit
if self.log is not None:
self.log.warn('Attempting to insert a graph with the same GraphID, skipping')
return
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
self.graphs[graph_id] = temp_graph

self.lock.acquire()
try:
# check this graph_id isn't already present
if graph_id in self.graphs.keys():
self.graphs[graph_id].clear()
# relabel incoming graph nodes to integers, then merge
temp_graph = nx.convert_node_labels_to_integers(graph, 1)
self.graphs[graph_id] = temp_graph
self.graph_node_ids[graph_id] = len(self.graphs[graph_id].nodes()) + 1
except Exception as e:
raise e
finally:
self.lock.release()

def del_graph(self, graph_id: str) -> None:
self.lock.acquire()
try:
self.graphs.pop(graph_id)
except KeyError:
pass
if len(self.graphs[graph_id].nodes) > 0:
self.graphs[graph_id].clear()
except Exception as e:
raise e
finally:
self.lock.release()

def extract_graph(self, graph_id: str) -> nx.Graph or None:
graph = self.graphs.get(graph_id, None)
if graph is not None:
return graph.copy()
else:
return None
self.lock.acquire()
graph = self.graphs[graph_id]
self.lock.release()
return graph.copy()

def get_graph(self, graph_id) -> nx.Graph:
# return the store for this graph
return self.graphs.get(graph_id, None)
self.lock.acquire()
ret = self.graphs[graph_id]
self.lock.release()
return ret

def del_all_graphs(self) -> None:
self.lock.acquire()
self.graphs.clear()
self.lock.release()

def add_blank_node_to_graph(self, graph_id, **attrs) -> int:
# add a new node into a graph, return internal
# int id of the added node
new_id = 1 + len(self.graphs[graph_id].nodes())
self.graphs[graph_id].add_node(new_id, GraphID=graph_id, **attrs)
self.lock.acquire()
try:
new_id = self.graph_node_ids[graph_id]
self.graph_node_ids[graph_id] += 1
self.graphs[graph_id].add_node(new_id, GraphID=graph_id, **attrs)
except Exception as e:
raise e
finally:
self.lock.release()
return new_id

storage_instance = None
Expand Down
7 changes: 5 additions & 2 deletions fim/user/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from ..graph.slices.networkx_asm import NetworkxASM
from ..graph.slices.neo4j_asm import Neo4jASM
from ..graph.abc_property_graph import ABCPropertyGraph, GraphFormat
from ..graph.resources.networkx_arm import NetworkXARMGraph, NetworkXGraphImporter
from ..graph.resources.networkx_arm import NetworkXARMGraph
from ..graph.networkx_property_graph import NetworkXGraphImporter
from ..graph.networkx_property_graph_disjoint import NetworkXGraphImporterDisjoint
from ..graph.resources.neo4j_arm import Neo4jARMGraph
from ..slivers.delegations import Delegation, Delegations, Pools, DelegationType, DelegationFormat
from fim.graph.resources.networkx_abqm import NetworkXAggregateBQM, NetworkXABQMFactory
Expand Down Expand Up @@ -81,7 +83,8 @@ def __init__(self, graph_file: str = None, graph_string: str = None, logger=None
importer=NetworkXGraphImporter(logger=logger),
logger=logger)
else:
if isinstance(importer, NetworkXGraphImporter):
if isinstance(importer, NetworkXGraphImporter) or \
isinstance(importer, NetworkXGraphImporterDisjoint):
self.graph_model = NetworkxASM(graph_id=str(uuid.uuid4()),
importer=importer,
logger=logger)
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ rfc3986==1.4.0
six==1.15.0
toml==0.10.2
tqdm==4.54.1
typed-ast==1.4.1
typing-extensions==3.7.4.3
webencodings==0.5.1
wrapt==1.12.1