Skip to content

Commit

Permalink
refactor: further refactoring of graph analysing (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Oct 25, 2023
1 parent 14a6e83 commit d0779ac
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 266 deletions.
216 changes: 110 additions & 106 deletions src/dsp_tools/analyse_xml_data/construct_and_analyze_graph.py
Expand Up @@ -6,24 +6,24 @@
import rustworkx as rx
from lxml import etree

from dsp_tools.analyse_xml_data.models import ResptrLink, XMLLink
from dsp_tools.analyse_xml_data.models import Cost, Edge, ResptrLink, XMLLink


def create_info_from_xml_for_graph(
root: etree._Element,
) -> tuple[list[ResptrLink], list[XMLLink], list[str]]:
"""
Create instances of the classes ResptrLink and XMLLink from the root of the XML file.
It adds a reference UUID with which the class instances that represent the links can be linked to the actual
XML elements.
Create link objects (ResptrLink/XMLLink) from the XML file,
and add a reference UUID to each XML element that contains a link (<resptr> or <text>).
With this UUID, the link objects can be identified in the XML data file.
Args:
root: root of the parsed XML file
Returns:
a list of all the resptr links represented in a class instance
a list of all the rich-text links represented in a class instance
a list with all the resource IDs used in the file
- All resptr links contained in the XML file, represented as ResptrLink objects.
- All XML links contained in the XML file, represented as XMLLink objects.
- A list with all resource IDs used in the XML file.
"""
resptr_links = []
xml_links = []
Expand Down Expand Up @@ -87,164 +87,169 @@ def _extract_ids_from_one_text_value(text: etree._Element) -> set[str]:


def make_graph(
resptr_links: list[ResptrLink], xml_links: list[XMLLink], all_resource_ids: list[str]
) -> tuple[rx.PyDiGraph[Any, Any], dict[int, str], list[tuple[int, int, ResptrLink | XMLLink]], set[int]]:
resptr_links: list[ResptrLink],
xml_links: list[XMLLink],
all_resource_ids: list[str],
) -> tuple[rx.PyDiGraph[Any, Any], dict[int, str], list[Edge]]:
"""
This function takes information about the resources (nodes) and links between them (edges).
This function takes information about the resources of an XML file and links between them.
From that it constructs a rustworkx directed graph.
Resources are represented as nodes and links as edges.
Args:
resptr_links: A list of objects representing a direct link (resptr)
between a starting resource and a target resource
xml_links: A list of objects representing one or more links from a single starting resource
to a set of target resources, where all target resources are linked to
from a single text value on the starting resource.
all_resource_ids: IDs of all the resources in the graph
resptr_links: objects representing a direct link between a starting resource and a target resource
xml_links: objects representing one or more links from a single text value of a single starting resource
to a set of target resources
all_resource_ids: IDs of all resources in the graph
Returns:
The rustworkx graph and a dictionary that contains the index number of the nodes with the original resource id
- The rustworkx graph.
- A dictionary that maps the rustworkx index number of the nodes to the original resource ID from the XML file.
- A list with all the edges in the graph.
"""
graph: rx.PyDiGraph[Any, Any] = rx.PyDiGraph() # pylint: disable=no-member
nodes = [(id_, None, None) for id_ in all_resource_ids]
node_indices = list(graph.add_nodes_from(nodes))
node_id_lookup = dict(zip(all_resource_ids, node_indices))
node_index_lookup = dict(zip(node_indices, all_resource_ids))
edges: list[tuple[int, int, ResptrLink | XMLLink]] = [
(node_id_lookup[x.source_id], node_id_lookup[x.target_id], x) for x in resptr_links
]
id_to_node = dict(zip(all_resource_ids, node_indices))
node_to_id = dict(zip(node_indices, all_resource_ids))
edges = [Edge(id_to_node[x.source_id], id_to_node[x.target_id], x) for x in resptr_links]
for xml in xml_links:
edges.extend([(node_id_lookup[xml.source_id], node_id_lookup[x], xml) for x in xml.target_ids])
graph.add_edges_from(edges)
return graph, node_index_lookup, edges, set(node_indices)
edges.extend([Edge(id_to_node[xml.source_id], id_to_node[x], xml) for x in xml.target_ids])
graph.add_edges_from([e.as_tuple() for e in edges])
return graph, node_to_id, edges


def _remove_leaf_nodes(
graph: rx.PyDiGraph[Any, Any],
node_index_lookup: dict[int, str],
node_to_id: dict[int, str],
node_indices: set[int],
) -> tuple[list[str], set[int]]:
"""
Leaf nodes are nodes that do not have any outgoing links.
This means that they have no dependencies and are ok to upload.
This function removes them from the graph and the set with remaining nodes in the graph.
This function removes them from the graph.
Args:
graph: graph
node_index_lookup: the dictionary so that we can find our IDs with the nodes index number from rx
node_indices: The set with the remaining node indices in the graph
node_to_id: mapping of the rustworkx index number of the nodes to the original resource ID from the XML file
node_indices: node indices that are in the graph
Returns:
A list with the ids of the removed leaf nodes
The set with the remaining nodes minus the leaf nodes
- A list with the IDs of the removed leaf nodes.
- A set with the indices of the nodes that remain in the graph.
"""
res: list[str] = []
while leaf_nodes := [x for x in node_indices if graph.out_degree(x) == 0]:
res.extend(node_index_lookup[n] for n in leaf_nodes)
removed_leaf_nodes: list[str] = []
remaining_node_indices = set(node_indices)
while leaf_nodes := [x for x in remaining_node_indices if graph.out_degree(x) == 0]:
removed_leaf_nodes.extend(node_to_id[n] for n in leaf_nodes)
graph.remove_nodes_from(leaf_nodes)
node_indices = node_indices - set(leaf_nodes)
return res, node_indices
remaining_node_indices = remaining_node_indices - set(leaf_nodes)
return removed_leaf_nodes, remaining_node_indices


def _find_cheapest_outgoing_links(
graph: rx.PyDiGraph[Any, Any],
cycle: list[tuple[int, int]],
edge_list: list[tuple[int, int, XMLLink | ResptrLink]],
) -> list[tuple[int, int, XMLLink | ResptrLink]]:
edges: list[Edge],
) -> list[Edge]:
"""
This function searches for the nodes whose outgoing links should be removed in order to break the cycle.
It calculates which links between the resources create the smallest stash.
Args:
graph: graph
cycle: the list with (source, target) for each edge in the cycle
edge_list: list of all the edges that were in the original graph
edges: edges in the graph (contains info about source node, target node, and link info)
Returns:
A list with the links that should be stashed.
It contains all the edges connecting the two nodes.
The edges (i.e. links) that should be stashed (containing all the edges connecting the two nodes)
"""
costs = []
costs: list[Cost] = []
for source, target in cycle:
edges_in = graph.in_edges(source)
node_gain = len(edges_in)
edges_out = graph.out_edges(source)
node_cost = sum(x[2].cost_links for x in edges_out)
node_value = node_cost / node_gain
costs.append((source, target, node_value, edges_out))
cheapest_nodes = sorted(costs, key=lambda x: x[2])[0]
cheapest_links = [x for x in edge_list if x[0] == cheapest_nodes[0] and x[1] == cheapest_nodes[1]]
costs.append(Cost(source, target, node_value))
cheapest_cost = sorted(costs, key=lambda x: x.node_value)[0]
cheapest_links = [x for x in edges if x.source == cheapest_cost.source and x.target == cheapest_cost.target]
return cheapest_links


def _remove_edges_to_stash(
graph: rx.PyDiGraph[Any, Any],
edges_to_remove: list[tuple[int, int, XMLLink | ResptrLink]],
edge_list: list[tuple[int, int, XMLLink | ResptrLink]],
edges_to_remove: list[Edge],
all_edges: list[Edge],
remaining_nodes: set[int],
) -> list[XMLLink | ResptrLink]:
) -> None:
"""
This function removes the edges from the graph in order to break a cycle.
It returns the information that enables us to identify the links in the real data.
Args:
graph: graph
edges_to_remove: list of all the edges that should be removed
edge_list: list of all the edges in the original graph
remaining_nodes: set with the indexes of the nodes in the graph
Returns:
a list with the class instances that represent links to stash
edges_to_remove: edges that should be removed
all_edges: all edges in the original graph
remaining_nodes: indices of the nodes in the graph
"""
source, target = edges_to_remove[0][0], edges_to_remove[0][1]
links_to_stash = [x[2] for x in edges_to_remove]
# if only one (source, target) is entered, it removes only one edge, not all
# therefore we need as many entries in the list as there are edges between the source and target to break the cycle
to_remove_list = [(x[0], x[1]) for x in edges_to_remove]
phantom_links = []
for instance in links_to_stash:
if isinstance(instance, XMLLink):
phantom_links.extend(_find_phantom_xml_edges(source, target, edge_list, instance, remaining_nodes))
to_remove_list.extend(phantom_links)
graph.remove_edges_from(to_remove_list)
return links_to_stash
normal_edges_to_remove = [(x.source, x.target) for x in edges_to_remove]
# if only one (source, target) is removed, it removes only one edge, not all
# therefore we need as many entries in the list as there are edges between the source and the target

phantom_edges_to_remove = []
source, target = edges_to_remove[0].source, edges_to_remove[0].target
for link_to_stash in [x.link_object for x in edges_to_remove]:
if isinstance(link_to_stash, XMLLink):
phantom_edges_to_remove.extend(
_find_phantom_xml_edges(source, target, all_edges, link_to_stash, remaining_nodes)
)

all_edges_to_remove = normal_edges_to_remove + phantom_edges_to_remove
graph.remove_edges_from(all_edges_to_remove)


def _find_phantom_xml_edges(
source: int,
target: int,
edge_list: list[tuple[int, int, XMLLink | ResptrLink]],
xml_instance: XMLLink,
source_node_index: int,
target_node_index: int,
all_edges: list[Edge],
xml_link_to_stash: XMLLink,
remaining_nodes: set[int],
) -> list[tuple[int, int]]:
"""
If an edge that will be removed represents an XML link
the link may contain further links to other resources.
If we stash the XMLLink then in the real data all the links are stashed.
This is not automatically the case in the rx graph.
We identify all the edges that need to be removed so that the rx graph represents the links that remain
in the real data.
If an edge that will be removed represents an XML link,
the text value may contain further links to other resources.
If we stash the XMLLink, then in the real data all links of that text value are stashed.
So, these "phantom" links must be removed from the graph.
This function identifies the edges that must be removed from the rx graph.
Args:
source: index of source node
target: index of target node
edge_list: list of all the edges in the original graph
xml_instance: class instance that will be stashed
remaining_nodes: indexes of all the nodes in the graph
source_node_index: rustworkx index of source node
target_node_index: rustworkx index of target node
all_edges: all edges in the original graph
xml_link_to_stash: XML link that will be stashed
remaining_nodes: indices of all nodes in the graph
Returns:
list with edges that represent the links in the original XML text
edges (rustworkx indices of nodes) that represent the links in the original XML text
"""

def check(x: tuple[int, int, XMLLink | ResptrLink]) -> bool:
# if we do not check if the target is in the remaining_nodes (maybe removed because of leaf node)
# we would get a NoEdgeBetweenNodes error
return x[0] == source and x[1] != target and x[2] == xml_instance and x[1] in remaining_nodes
def check(x: Edge) -> bool:
return all(
(
x.source == source_node_index,
x.target != target_node_index,
x.link_object == xml_link_to_stash,
x.target in remaining_nodes,
# the target could have been removed because it was a leaf node, so we must check if it is still there
)
)

return [(x[0], x[1]) for x in edge_list if check(x)]
return [(x.source, x.target) for x in all_edges if check(x)]


def _add_stash_to_lookup_dict(
stash_dict: dict[str, list[str]], links_to_stash: list[XMLLink | ResptrLink]
stash_dict: dict[str, list[str]],
links_to_stash: list[XMLLink | ResptrLink],
) -> dict[str, list[str]]:
stash_list = [stash_link.link_uuid for stash_link in links_to_stash]
# all stashed links have the same subject id, so we can just take the first one
Expand All @@ -258,40 +263,39 @@ def _add_stash_to_lookup_dict(

def generate_upload_order(
graph: rx.PyDiGraph[Any, Any],
node_index_lookup: dict[int, str],
edge_list: list[tuple[int, int, XMLLink | ResptrLink]],
node_indices: set[int],
node_to_id: dict[int, str],
edges: list[Edge],
) -> tuple[dict[str, list[str]], list[str], int]:
"""
Generate the order in which the resources should be uploaded to the DSP-API based on the dependencies.
Args:
graph: graph
node_index_lookup: mapping between graph indices and original IDs
edge_list: list of edges in the graph as tuple (source node, target node, link info)
node_indices: index numbers of the nodes still in the graph
node_to_id: mapping between indices of the graph nodes and original resource IDs from the XML file
edges: edges in the graph (contains info about source node, target node, and link info)
Returns:
A dictionary which maps the resources that have stashes to the UUIDs of the stashed links.
A list of resource IDs which gives the order in which the resources should be uploaded to DSP-API.
The number of links in the stash.
- A dictionary which maps the resources that have stashes to the UUIDs of the stashed links.
- A list of resource IDs which gives the order in which the resources should be uploaded to DSP-API.
- The number of links in the stash.
"""
upload_order: list[str] = []
stash_lookup: dict[str, list[str]] = {}
leaf_nodes, node_indices = _remove_leaf_nodes(graph, node_index_lookup, node_indices)
node_indices = set(node_to_id.keys())
leaf_nodes, remaining_node_indices = _remove_leaf_nodes(graph, node_to_id, node_indices)
upload_order.extend(leaf_nodes)
stash_counter = 0
while node_indices:
while remaining_node_indices:
cycle = list(rx.digraph_find_cycle(graph)) # type: ignore[attr-defined] # pylint: disable=no-member
links_to_remove = _find_cheapest_outgoing_links(graph, cycle, edge_list)
links_to_remove = _find_cheapest_outgoing_links(graph, cycle, edges)
stash_counter += len(links_to_remove)
links_to_stash = _remove_edges_to_stash(
_remove_edges_to_stash(
graph=graph,
edges_to_remove=links_to_remove,
edge_list=edge_list,
remaining_nodes=node_indices,
all_edges=edges,
remaining_nodes=remaining_node_indices,
)
stash_lookup = _add_stash_to_lookup_dict(stash_lookup, links_to_stash)
leaf_nodes, node_indices = _remove_leaf_nodes(graph, node_index_lookup, node_indices)
stash_lookup = _add_stash_to_lookup_dict(stash_lookup, [x.link_object for x in links_to_remove])
leaf_nodes, remaining_node_indices = _remove_leaf_nodes(graph, node_to_id, remaining_node_indices)
upload_order.extend(leaf_nodes)
return stash_lookup, upload_order, stash_counter
38 changes: 36 additions & 2 deletions src/dsp_tools/analyse_xml_data/models.py
Expand Up @@ -9,7 +9,7 @@ class ResptrLink:
"""
This class represents a direct link (resptr) between a starting resource and a target resource.
Args:
Attributes:
source_id: ID of the resource from which the link originates
target_id: ID of the resource where the link points to
link_uuid: identifier of this link
Expand All @@ -31,7 +31,7 @@ class XMLLink:
This class represents one or more links from a single starting resource to a set of target resources,
where all target resources are linked to from a single text value of the starting resource.
Args:
Attributes:
source_id: ID of the resource from which the link(s) originate
target_ids: IDs of the resources that are referenced in the text value
link_uuid: identifier of this link
Expand All @@ -45,3 +45,37 @@ class XMLLink:
def cost_links(self) -> float:
"""The cost of this outgoing link (1 / number of links in the XML text)"""
return 1 / len(self.target_ids)


@dataclass(frozen=True)
class Edge:
"""
This class represents an edge in the rustworkx graph.
Attributes:
source: rustworkx index of the resource from which the link originates
target: rustworkx index of the resource where the link points to
link_object: the link that connects the source with the target
"""

source: int
target: int
link_object: ResptrLink | XMLLink

def as_tuple(self) -> tuple[int, int, ResptrLink | XMLLink]:
"""Returns a representation of this edge as a tuple of the source index, target index and link object"""
return self.source, self.target, self.link_object


@dataclass(frozen=True)
class Cost:
"""
Attributes:
source: rustworkx index of the resource from which the link originates
target: rustworkx index of the resource where the link points to
node_value: cost-gain-ratio if this link is stashed
"""

source: int
target: int
node_value: float

0 comments on commit d0779ac

Please sign in to comment.