Skip to content

Commit

Permalink
Merge pull request #931 from mwatts15/357-batch-add-graph
Browse files Browse the repository at this point in the history
Adding a wrapper for batching add() calls to a Graph
  • Loading branch information
nicholascar committed May 3, 2020
2 parents 981a5ba + 7c28713 commit d52a378
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 0 deletions.
68 changes: 68 additions & 0 deletions rdflib/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@
"Dataset",
"UnSupportedAggregateOperation",
"ReadOnlyGraphAggregate",
"BatchAddGraph",
]


Expand Down Expand Up @@ -2013,6 +2014,73 @@ def _assertnode(*terms):
return True


class BatchAddGraph(object):
'''
Wrapper around graph that turns calls to :meth:`add` (and optionally, :meth:`addN`)
into calls to :meth:`~rdflib.graph.Graph.addN`.
:Parameters:
- `graph`: The graph to wrap
- `batch_size`: The maximum number of triples to buffer before passing to
`graph`'s `addN`
- `batch_addn`: If True, then even calls to `addN` will be batched according to
`batch_size`
:ivar graph: The wrapped graph
:ivar count: The number of triples buffered since initaialization or the last call
to :meth:`reset`
:ivar batch: The current buffer of triples
'''

def __init__(self, graph, batch_size=1000, batch_addn=False):
if not batch_size or batch_size < 2:
raise ValueError("batch_size must be a positive number")
self.graph = graph
self.__graph_tuple = (graph,)
self.__batch_size = batch_size
self.__batch_addn = batch_addn
self.reset()

def reset(self):
'''
Manually clear the buffered triples and reset the count to zero
'''
self.batch = []
self.count = 0

def add(self, triple_or_quad):
'''
Add a triple to the buffer
:param triple: The triple to add
'''
if len(self.batch) >= self.__batch_size:
self.graph.addN(self.batch)
self.batch = []
self.count += 1
if len(triple_or_quad) == 3:
self.batch.append(triple_or_quad + self.__graph_tuple)
else:
self.batch.append(triple_or_quad)

def addN(self, quads):
if self.__batch_addn:
for q in quads:
self.add(q)
else:
self.graph.addN(quads)

def __enter__(self):
self.reset()
return self

def __exit__(self, *exc):
if exc[0] is None:
self.graph.addN(self.batch)


def test():
import doctest

Expand Down
89 changes: 89 additions & 0 deletions test/test_batch_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import unittest
from rdflib.graph import Graph, BatchAddGraph
from rdflib.term import URIRef


class TestBatchAddGraph(unittest.TestCase):
def test_batch_size_zero_denied(self):
with self.assertRaises(ValueError):
BatchAddGraph(Graph(), batch_size=0)

def test_batch_size_none_denied(self):
with self.assertRaises(ValueError):
BatchAddGraph(Graph(), batch_size=None)

def test_batch_size_one_denied(self):
with self.assertRaises(ValueError):
BatchAddGraph(Graph(), batch_size=1)

def test_batch_size_negative_denied(self):
with self.assertRaises(ValueError):
BatchAddGraph(Graph(), batch_size=-12)

def test_exit_submits_partial_batch(self):
trip = (URIRef('a'), URIRef('b'), URIRef('c'))
g = Graph()
with BatchAddGraph(g, batch_size=10) as cut:
cut.add(trip)
self.assertIn(trip, g)

def test_add_more_than_batch_size(self):
trips = [(URIRef('a'), URIRef('b%d' % i), URIRef('c%d' % i))
for i in range(12)]
g = Graph()
with BatchAddGraph(g, batch_size=10) as cut:
for trip in trips:
cut.add(trip)
self.assertEqual(12, len(g))

def test_add_quad_for_non_conjunctive_empty(self):
'''
Graph drops quads that don't match our graph. Make sure we do the same
'''
g = Graph(identifier='http://example.org/g')
badg = Graph(identifier='http://example.org/badness')
with BatchAddGraph(g) as cut:
cut.add((URIRef('a'), URIRef('b'), URIRef('c'), badg))
self.assertEqual(0, len(g))

def test_add_quad_for_non_conjunctive_pass_on_context_matches(self):
g = Graph()
with BatchAddGraph(g) as cut:
cut.add((URIRef('a'), URIRef('b'), URIRef('c'), g))
self.assertEqual(1, len(g))

def test_no_addN_on_exception(self):
'''
Even if we've added triples so far, it may be that attempting to add the last
batch is the cause of our exception, so we don't want to attempt again
'''
g = Graph()
trips = [(URIRef('a'), URIRef('b%d' % i), URIRef('c%d' % i))
for i in range(12)]

try:
with BatchAddGraph(g, batch_size=10) as cut:
for i, trip in enumerate(trips):
cut.add(trip)
if i == 11:
raise Exception('myexc')
except Exception as e:
if str(e) != 'myexc':
pass
self.assertEqual(10, len(g))

def test_addN_batching_addN(self):
class MockGraph(object):
def __init__(self):
self.counts = []

def addN(self, quads):
self.counts.append(sum(1 for _ in quads))

g = MockGraph()
quads = [(URIRef('a'), URIRef('b%d' % i), URIRef('c%d' % i), g)
for i in range(12)]

with BatchAddGraph(g, batch_size=10, batch_addn=True) as cut:
cut.addN(quads)
self.assertEqual(g.counts, [10, 2])

0 comments on commit d52a378

Please sign in to comment.