From 760b3ba8e5f222c574af0b1d7779b4660d8b7471 Mon Sep 17 00:00:00 2001 From: Andrew Woods Date: Mon, 24 Nov 2014 13:00:49 -0500 Subject: [PATCH] Ensure delete preceeds update from SparqlIndexer Resolves: https://www.pivotaltracker.com/story/show/83268844 --- .../fcrepo/indexer/sparql/SparqlIndexer.java | 66 ++++++++++++------- .../indexer/sparql/SparqlIndexerTest.java | 9 +++ 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/fcrepo-message-consumer-core/src/main/java/org/fcrepo/indexer/sparql/SparqlIndexer.java b/fcrepo-message-consumer-core/src/main/java/org/fcrepo/indexer/sparql/SparqlIndexer.java index c515ce8..85bd00e 100644 --- a/fcrepo-message-consumer-core/src/main/java/org/fcrepo/indexer/sparql/SparqlIndexer.java +++ b/fcrepo-message-consumer-core/src/main/java/org/fcrepo/indexer/sparql/SparqlIndexer.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.Callable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.ListeningExecutorService; import com.hp.hpl.jena.rdf.model.Model; @@ -80,7 +81,7 @@ public class SparqlIndexer extends AsynchIndexer { @Override public Callable updateSynch(final URI pid, final Model model) { LOGGER.debug("Received update for: {}", pid); - removeSynch(pid); + removeSynch(pid, true); // build a list of triples final StmtIterator triples = model.listStatements(); final QuadDataAcc add = new QuadDataAcc(); @@ -99,6 +100,11 @@ public Callable updateSynch(final URI pid, final Model model) { **/ @Override public Callable removeSynch(final URI subject) { + return removeSynch(subject, false); + } + + @VisibleForTesting + protected Callable removeSynch(final URI subject, final boolean blocking) { LOGGER.debug("Received remove for: {}", subject); // find triples/quads to delete @@ -138,7 +144,7 @@ public Callable removeSynch(final URI subject) { } // send updates - return exec(del); + return exec(del, blocking); } /** @@ -156,6 +162,10 @@ private boolean matches( final URI resource, final String candidate) { } private Callable exec(final UpdateRequest update) { + return exec(update, false); + } + + private Callable exec(final UpdateRequest update, final boolean blocking) { if (update.getOperations().isEmpty()) { LOGGER.debug("Received empty update/remove operation."); return new Callable() { @@ -193,30 +203,40 @@ public Void call() { } }; - final ListenableFutureTask task = - ListenableFutureTask.create(callable); - task.addListener(new Runnable() { + if (blocking) { + try { + callable.call(); + } catch (Exception e) { + LOGGER.error("Error calling Sparql update/remove!, {}", e.getMessage()); + } - @Override - public void run() { - LOGGER.debug("Completed Sparql update/removal."); - if (LOGGER.isTraceEnabled()) { - try ( - final OutputStream buffer = new ByteArrayOutputStream()) { - final IndentedWriter out = new IndentedWriter(buffer); - update.output(out); - LOGGER.trace("Executed update/remove operation:\n{}", - buffer.toString()); - out.close(); - } catch (final IOException e) { - LOGGER.error( - "Couldn't retrieve execution of update/remove operation!", - e); + } else { + final ListenableFutureTask task = + ListenableFutureTask.create(callable); + task.addListener(new Runnable() { + + @Override + public void run() { + LOGGER.debug("Completed Sparql update/removal."); + if (LOGGER.isTraceEnabled()) { + try ( + final OutputStream buffer = new ByteArrayOutputStream()) { + final IndentedWriter out = new IndentedWriter(buffer); + update.output(out); + LOGGER.trace("Executed update/remove operation:\n{}", + buffer.toString()); + out.close(); + } catch (final IOException e) { + LOGGER.error( + "Couldn't retrieve execution of update/remove operation!", + e); + } } } - } - }, executorService); - executorService.submit(task); + }, executorService); + executorService.submit(task); + } + return callable; } diff --git a/fcrepo-message-consumer-core/src/test/java/org/fcrepo/indexer/sparql/SparqlIndexerTest.java b/fcrepo-message-consumer-core/src/test/java/org/fcrepo/indexer/sparql/SparqlIndexerTest.java index b50dcdb..2750246 100644 --- a/fcrepo-message-consumer-core/src/test/java/org/fcrepo/indexer/sparql/SparqlIndexerTest.java +++ b/fcrepo-message-consumer-core/src/test/java/org/fcrepo/indexer/sparql/SparqlIndexerTest.java @@ -63,7 +63,16 @@ public void testGetIndexerType() { @Test public void testRemoveSynch() throws URISyntaxException { testIndexer.removeSynch(new URI("info://obj-0")); + doTestRemoveSynch(); + } + + @Test + public void testRemoveSynchBlocking() throws URISyntaxException { + testIndexer.removeSynch(new URI("info://obj-0"), true); + doTestRemoveSynch(); + } + private void doTestRemoveSynch() { final String cmd0 = "DELETE WHERE { <" + createURI("info://obj-0") + "> ?p ?o }"; final String cmd2 = "DELETE WHERE { <" + createURI("info://obj-0/child") + "> ?p ?o }"; Mockito.verify(updateRequest).add(cmd0);