Permalink
Fetching contributors…
Cannot retrieve contributors at this time
581 lines (485 sloc) 19.7 KB
# -*- coding: utf-8 -*-
"""
A collection of utilities for canonicalizing and inspecting graphs.
Among other things, they solve of the problem of deterministic bnode
comparisons.
Warning: the time to canonicalize bnodes may increase exponentially on
degenerate larger graphs. Use with care!
Example of comparing two graphs::
>>> g1 = Graph().parse(format='n3', data='''
... @prefix : <http://example.org/ns#> .
... <http://example.org> :rel
... <http://example.org/same>,
... [ :label "Same" ],
... <http://example.org/a>,
... [ :label "A" ] .
... ''')
>>> g2 = Graph().parse(format='n3', data='''
... @prefix : <http://example.org/ns#> .
... <http://example.org> :rel
... <http://example.org/same>,
... [ :label "Same" ],
... <http://example.org/b>,
... [ :label "B" ] .
... ''')
>>>
>>> iso1 = to_isomorphic(g1)
>>> iso2 = to_isomorphic(g2)
These are not isomorphic::
>>> iso1 == iso2
False
Diff the two graphs::
>>> in_both, in_first, in_second = graph_diff(iso1, iso2)
Present in both::
>>> def dump_nt_sorted(g):
... for l in sorted(g.serialize(format='nt').splitlines()):
... if l: print(l.decode('ascii'))
>>> dump_nt_sorted(in_both) #doctest: +SKIP
<http://example.org>
<http://example.org/ns#rel> <http://example.org/same> .
<http://example.org>
<http://example.org/ns#rel> _:cbcaabaaba17fecbc304a64f8edee4335e .
_:cbcaabaaba17fecbc304a64f8edee4335e
<http://example.org/ns#label> "Same" .
Only in first::
>>> dump_nt_sorted(in_first) #doctest: +SKIP
<http://example.org>
<http://example.org/ns#rel> <http://example.org/a> .
<http://example.org>
<http://example.org/ns#rel> _:cb124e4c6da0579f810c0ffe4eff485bd9 .
_:cb124e4c6da0579f810c0ffe4eff485bd9
<http://example.org/ns#label> "A" .
Only in second::
>>> dump_nt_sorted(in_second) #doctest: +SKIP
<http://example.org>
<http://example.org/ns#rel> <http://example.org/b> .
<http://example.org>
<http://example.org/ns#rel> _:cb558f30e21ddfc05ca53108348338ade8 .
_:cb558f30e21ddfc05ca53108348338ade8
<http://example.org/ns#label> "B" .
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# TODO:
# - Doesn't handle quads.
# - Add warning and/or safety mechanism before working on large graphs?
# - use this in existing Graph.isomorphic?
__all__ = ['IsomorphicGraph', 'to_isomorphic', 'isomorphic',
'to_canonical_graph', 'graph_diff', 'similar']
from rdflib.graph import Graph, ConjunctiveGraph, ReadOnlyGraphAggregate
from rdflib.term import BNode, Node
from hashlib import sha256
from datetime import datetime
from collections import defaultdict
from six import text_type
def _total_seconds(td):
result = td.days * 24 * 60 * 60
result += td.seconds
result += td.microseconds / 1000000.0
return result
class _runtime(object):
def __init__(self, label):
self.label = label
def __call__(self, f):
if self.label is None:
self.label = f.__name__ + "_runtime"
def wrapped_f(*args, **kwargs):
start = datetime.now()
result = f(*args, **kwargs)
if 'stats' in kwargs and kwargs['stats'] is not None:
stats = kwargs['stats']
stats[self.label] = _total_seconds(datetime.now() - start)
return result
return wrapped_f
class _call_count(object):
def __init__(self, label):
self.label = label
def __call__(self, f):
if self.label is None:
self.label = f.__name__ + "_runtime"
def wrapped_f(*args, **kwargs):
if 'stats' in kwargs and kwargs['stats'] is not None:
stats = kwargs['stats']
if self.label not in stats:
stats[self.label] = 0
stats[self.label] += 1
return f(*args, **kwargs)
return wrapped_f
class IsomorphicGraph(ConjunctiveGraph):
"""An implementation of the RGDA1 graph digest algorithm.
An implementation of RGDA1 (publication below),
a combination of Sayers & Karp's graph digest algorithm using
sum and SHA-256 <http://www.hpl.hp.com/techreports/2003/HPL-2003-235R1.pdf>
and traces <http://pallini.di.uniroma1.it>, an average case
polynomial time algorithm for graph canonicalization.
McCusker, J. P. (2015). WebSig: A Digital Signature Framework for the Web.
Rensselaer Polytechnic Institute, Troy, NY.
http://gradworks.umi.com/3727015.pdf
"""
def __init__(self, **kwargs):
super(IsomorphicGraph, self).__init__(**kwargs)
def __eq__(self, other):
"""Graph isomorphism testing."""
if not isinstance(other, IsomorphicGraph):
return False
elif len(self) != len(other):
return False
return self.internal_hash() == other.internal_hash()
def __ne__(self, other):
"""Negative graph isomorphism testing."""
return not self.__eq__(other)
def __hash__(self):
return super(IsomorphicGraph, self).__hash__()
def graph_digest(self, stats=None):
"""Synonym for IsomorphicGraph.internal_hash."""
return self.internal_hash(stats=stats)
def internal_hash(self, stats=None):
"""
This is defined instead of __hash__ to avoid a circular recursion
scenario with the Memory store for rdflib which requires a hash lookup
in order to return a generator of triples.
"""
return _TripleCanonicalizer(self).to_hash(stats=stats)
class Color:
def __init__(self, nodes, hashfunc, color=(), hash_cache=None):
if hash_cache is None:
hash_cache = {}
self._hash_cache = hash_cache
self.color = color
self.nodes = nodes
self.hashfunc = hashfunc
self._hash_color = None
def __str__(self):
nodes, color = self.key()
return "Color %s (%s nodes)" % (color, nodes)
def key(self):
return (len(self.nodes), self.hash_color())
def hash_color(self, color=None):
if color is None:
color = self.color
if color in self._hash_cache:
return self._hash_cache[color]
def stringify(x):
if isinstance(x, Node):
return x.n3()
else:
return text_type(x)
if isinstance(color, Node):
return stringify(color)
value = 0
for triple in color:
value += self.hashfunc(' '.join([stringify(x) for x in triple]))
val = u"%x" % value
self._hash_cache[color] = val
return val
def distinguish(self, W, graph):
colors = {}
for n in self.nodes:
new_color = list(self.color)
for node in W.nodes:
new_color += [
(1, p, W.hash_color())
for s, p, o in graph.triples((n, None, node))]
new_color += [
(W.hash_color(), p, 3)
for s, p, o in graph.triples((node, None, n))]
new_color = tuple(new_color)
new_hash_color = self.hash_color(new_color)
if new_hash_color not in colors:
c = Color(
[], self.hashfunc, new_color,
hash_cache=self._hash_cache)
colors[new_hash_color] = c
colors[new_hash_color].nodes.append(n)
return colors.values()
def discrete(self):
return len(self.nodes) == 1
def copy(self):
return Color(
self.nodes[:], self.hashfunc, self.color,
hash_cache=self._hash_cache)
class _TripleCanonicalizer(object):
def __init__(self, graph, hashfunc=sha256):
self.graph = graph
def _hashfunc(s):
h = hashfunc()
h.update(text_type(s).encode("utf8"))
return int(h.hexdigest(), 16)
self._hash_cache = {}
self.hashfunc = _hashfunc
def _discrete(self, coloring):
return len([c for c in coloring if not c.discrete()]) == 0
def _initial_color(self):
"""Finds an initial color for the graph.
Finds an initial color fo the graph by finding all blank nodes and
non-blank nodes that are adjacent. Nodes that are not adjacent to blank
nodes are not included, as they are a) already colored (by URI or literal)
and b) do not factor into the color of any blank node.
"""
bnodes = set()
others = set()
self._neighbors = defaultdict(set)
for s, p, o in self.graph:
nodes = set([s, p, o])
b = set([x for x in nodes if isinstance(x, BNode)])
if len(b) > 0:
others |= nodes - b
bnodes |= b
if isinstance(s, BNode):
self._neighbors[s].add(o)
if isinstance(o, BNode):
self._neighbors[o].add(s)
if isinstance(p, BNode):
self._neighbors[p].add(s)
self._neighbors[p].add(p)
if len(bnodes) > 0:
return [
Color(list(bnodes), self.hashfunc, hash_cache=self._hash_cache)
] + [
Color([x], self.hashfunc, x, hash_cache=self._hash_cache)
for x in others
]
else:
return []
def _individuate(self, color, individual):
new_color = list(color.color)
new_color.append((len(color.nodes),))
color.nodes.remove(individual)
c = Color([individual], self.hashfunc, tuple(new_color),
hash_cache=self._hash_cache)
return c
def _get_candidates(self, coloring):
candidates = [c for c in coloring if not c.discrete()]
for c in [c for c in coloring if not c.discrete()]:
for node in c.nodes:
yield node, c
def _refine(self, coloring, sequence):
sequence = sorted(sequence, key=lambda x: x.key(), reverse=True)
coloring = coloring[:]
while len(sequence) > 0 and not self._discrete(coloring):
W = sequence.pop()
for c in coloring[:]:
if len(c.nodes) > 1 or isinstance(c.nodes[0], BNode):
colors = sorted(c.distinguish(W, self.graph),
key=lambda x: x.key(),
reverse=True)
coloring.remove(c)
coloring.extend(colors)
try:
si = sequence.index(c)
sequence = sequence[:si] + colors + sequence[si+1:]
except ValueError:
sequence = colors[1:] + sequence
combined_colors = []
combined_color_map = dict()
for color in coloring:
color_hash = color.hash_color()
# This is a hash collision, and be combined into a single color for individuation.
if color_hash in combined_color_map:
combined_color_map[color_hash].nodes.extend(color.nodes)
else:
combined_colors.append(color)
combined_color_map[color_hash] = color
return combined_colors
@_runtime("to_hash_runtime")
def to_hash(self, stats=None):
result = 0
for triple in self.canonical_triples(stats=stats):
result += self.hashfunc(' '.join([x.n3() for x in triple]))
if stats is not None:
stats['graph_digest'] = "%x" % result
return result
def _experimental_path(self, coloring):
coloring = [c.copy() for c in coloring]
while not self._discrete(coloring):
color = [x for x in coloring if not x.discrete()][0]
node = color.nodes[0]
new_color = self._individuate(color, node)
coloring.append(new_color)
coloring = self._refine(coloring, [new_color])
return coloring
def _create_generator(self, colorings, groupings=None):
if not groupings:
groupings = defaultdict(set)
for group in zip(*colorings):
g = set([c.nodes[0] for c in group])
for n in group:
g |= groupings[n]
for n in g:
groupings[n] = g
return groupings
@_call_count("individuations")
def _traces(self, coloring, stats=None, depth=[0]):
if stats is not None and 'prunings' not in stats:
stats['prunings'] = 0
depth[0] += 1
candidates = self._get_candidates(coloring)
best = []
best_score = None
best_experimental = None
best_experimental_score = None
last_coloring = None
generator = defaultdict(set)
visited = set()
for candidate, color in candidates:
if candidate in generator:
v = generator[candidate] & visited
if len(v) > 0:
visited.add(candidate)
continue
visited.add(candidate)
coloring_copy = []
color_copy = None
for c in coloring:
c_copy = c.copy()
coloring_copy.append(c_copy)
if c == color:
color_copy = c_copy
new_color = self._individuate(color_copy, candidate)
coloring_copy.append(new_color)
refined_coloring = self._refine(coloring_copy, [new_color])
color_score = tuple([c.key() for c in refined_coloring])
experimental = self._experimental_path(coloring_copy)
experimental_score = set([c.key() for c in experimental])
if last_coloring:
generator = self._create_generator(
[last_coloring, experimental],
generator)
last_coloring = experimental
if best_score is None or best_score < color_score:
best = [refined_coloring]
best_score = color_score
best_experimental = experimental
best_experimental_score = experimental_score
elif best_score > color_score:
# prune this branch.
if stats is not None:
stats['prunings'] += 1
elif experimental_score != best_experimental_score:
best.append(refined_coloring)
else:
# prune this branch.
if stats is not None:
stats['prunings'] += 1
discrete = [x for x in best if self._discrete(x)]
if len(discrete) == 0:
best_score = None
best_depth = None
for coloring in best:
d = [depth[0]]
new_color = self._traces(coloring, stats=stats, depth=d)
color_score = tuple([c.key() for c in refined_coloring])
if best_score is None or color_score > best_score:
discrete = [new_color]
best_score = color_score
best_depth = d[0]
depth[0] = best_depth
return discrete[0]
def canonical_triples(self, stats=None):
if stats is not None:
start_canonicalization = datetime.now()
if stats is not None:
start_coloring = datetime.now()
coloring = self._initial_color()
if stats is not None:
stats['triple_count'] = len(self.graph)
stats['adjacent_nodes'] = max(0, len(coloring) - 1)
coloring = self._refine(coloring, coloring[:])
if stats is not None:
stats['initial_coloring_runtime'] = _total_seconds(datetime.now() - start_coloring)
stats['initial_color_count'] = len(coloring)
if not self._discrete(coloring):
depth = [0]
coloring = self._traces(coloring, stats=stats, depth=depth)
if stats is not None:
stats['tree_depth'] = depth[0]
elif stats is not None:
stats['individuations'] = 0
stats['tree_depth'] = 0
if stats is not None:
stats['color_count'] = len(coloring)
bnode_labels = dict([(c.nodes[0], c.hash_color()) for c in coloring])
if stats is not None:
stats["canonicalize_triples_runtime"] = _total_seconds(datetime.now() - start_coloring)
for triple in self.graph:
result = tuple(self._canonicalize_bnodes(triple, bnode_labels))
yield result
def _canonicalize_bnodes(self, triple, labels):
for term in triple:
if isinstance(term, BNode):
yield BNode(value="cb%s" % labels[term])
else:
yield term
def to_isomorphic(graph):
if isinstance(graph, IsomorphicGraph):
return graph
result = IsomorphicGraph()
if hasattr(graph, "identifier"):
result = IsomorphicGraph(identifier=graph.identifier)
result += graph
return result
def isomorphic(graph1, graph2):
"""Compare graph for equality.
Uses an algorithm to compute unique hashes which takes bnodes into account.
Examples::
>>> g1 = Graph().parse(format='n3', data='''
... @prefix : <http://example.org/ns#> .
... <http://example.org> :rel <http://example.org/a> .
... <http://example.org> :rel <http://example.org/b> .
... <http://example.org> :rel [ :label "A bnode." ] .
... ''')
>>> g2 = Graph().parse(format='n3', data='''
... @prefix ns: <http://example.org/ns#> .
... <http://example.org> ns:rel [ ns:label "A bnode." ] .
... <http://example.org> ns:rel <http://example.org/b>,
... <http://example.org/a> .
... ''')
>>> isomorphic(g1, g2)
True
>>> g3 = Graph().parse(format='n3', data='''
... @prefix : <http://example.org/ns#> .
... <http://example.org> :rel <http://example.org/a> .
... <http://example.org> :rel <http://example.org/b> .
... <http://example.org> :rel <http://example.org/c> .
... ''')
>>> isomorphic(g1, g3)
False
"""
gd1 = _TripleCanonicalizer(graph1).to_hash()
gd2 = _TripleCanonicalizer(graph2).to_hash()
return gd1 == gd2
def to_canonical_graph(g1, stats=None):
"""Creates a canonical, read-only graph.
Creates a canonical, read-only graph where all bnode id:s are based on
deterministical SHA-256 checksums, correlated with the graph contents.
"""
graph = Graph()
graph += _TripleCanonicalizer(g1).canonical_triples(stats=stats)
return ReadOnlyGraphAggregate([graph])
def graph_diff(g1, g2):
"""Returns three sets of triples: "in both", "in first" and "in second"."""
# bnodes have deterministic values in canonical graphs:
cg1 = to_canonical_graph(g1)
cg2 = to_canonical_graph(g2)
in_both = cg1 * cg2
in_first = cg1 - cg2
in_second = cg2 - cg1
return (in_both, in_first, in_second)
_MOCK_BNODE = BNode()
def similar(g1, g2):
"""Checks if the two graphs are "similar".
Checks if the two graphs are "similar", by comparing sorted triples where
all bnodes have been replaced by a singular mock bnode (the
``_MOCK_BNODE``).
This is a much cheaper, but less reliable, alternative to the comparison
algorithm in ``isomorphic``.
"""
return all(t1 == t2 for (t1, t2) in _squashed_graphs_triples(g1, g2))
def _squashed_graphs_triples(g1, g2):
for (t1, t2) in zip(sorted(_squash_graph(g1)), sorted(_squash_graph(g2))):
yield t1, t2
def _squash_graph(graph):
return (_squash_bnodes(triple) for triple in graph)
def _squash_bnodes(triple):
return tuple((isinstance(t, BNode) and _MOCK_BNODE) or t for t in triple)