Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Ensure delete preceeds update from SparqlIndexer
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Woods committed Nov 24, 2014
1 parent 50198de commit 760b3ba
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.Callable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.hp.hpl.jena.rdf.model.Model;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class SparqlIndexer extends AsynchIndexer<Model, Void> {
@Override
public Callable<Void> updateSynch(final URI pid, final Model model) {
LOGGER.debug("Received update for: {}", pid);
removeSynch(pid);
removeSynch(pid, true);
// build a list of triples
final StmtIterator triples = model.listStatements();
final QuadDataAcc add = new QuadDataAcc();
Expand All @@ -99,6 +100,11 @@ public Callable<Void> updateSynch(final URI pid, final Model model) {
**/
@Override
public Callable<Void> removeSynch(final URI subject) {
return removeSynch(subject, false);
}

@VisibleForTesting
protected Callable<Void> removeSynch(final URI subject, final boolean blocking) {

LOGGER.debug("Received remove for: {}", subject);
// find triples/quads to delete
Expand Down Expand Up @@ -138,7 +144,7 @@ public Callable<Void> removeSynch(final URI subject) {
}

// send updates
return exec(del);
return exec(del, blocking);
}

/**
Expand All @@ -156,6 +162,10 @@ private boolean matches( final URI resource, final String candidate) {
}

private Callable<Void> exec(final UpdateRequest update) {
return exec(update, false);
}

private Callable<Void> exec(final UpdateRequest update, final boolean blocking) {
if (update.getOperations().isEmpty()) {
LOGGER.debug("Received empty update/remove operation.");
return new Callable<Void>() {
Expand Down Expand Up @@ -193,30 +203,40 @@ public Void call() {
}
};

final ListenableFutureTask<Void> task =
ListenableFutureTask.create(callable);
task.addListener(new Runnable() {
if (blocking) {
try {
callable.call();
} catch (Exception e) {
LOGGER.error("Error calling Sparql update/remove!, {}", e.getMessage());
}

@Override
public void run() {
LOGGER.debug("Completed Sparql update/removal.");
if (LOGGER.isTraceEnabled()) {
try (
final OutputStream buffer = new ByteArrayOutputStream()) {
final IndentedWriter out = new IndentedWriter(buffer);
update.output(out);
LOGGER.trace("Executed update/remove operation:\n{}",
buffer.toString());
out.close();
} catch (final IOException e) {
LOGGER.error(
"Couldn't retrieve execution of update/remove operation!",
e);
} else {
final ListenableFutureTask<Void> task =
ListenableFutureTask.create(callable);
task.addListener(new Runnable() {

@Override
public void run() {
LOGGER.debug("Completed Sparql update/removal.");
if (LOGGER.isTraceEnabled()) {
try (
final OutputStream buffer = new ByteArrayOutputStream()) {
final IndentedWriter out = new IndentedWriter(buffer);
update.output(out);
LOGGER.trace("Executed update/remove operation:\n{}",
buffer.toString());
out.close();
} catch (final IOException e) {
LOGGER.error(
"Couldn't retrieve execution of update/remove operation!",
e);
}
}
}
}
}, executorService);
executorService.submit(task);
}, executorService);
executorService.submit(task);
}

return callable;
}

Expand Down
Expand Up @@ -63,7 +63,16 @@ public void testGetIndexerType() {
@Test
public void testRemoveSynch() throws URISyntaxException {
testIndexer.removeSynch(new URI("info://obj-0"));
doTestRemoveSynch();
}

@Test
public void testRemoveSynchBlocking() throws URISyntaxException {
testIndexer.removeSynch(new URI("info://obj-0"), true);
doTestRemoveSynch();
}

private void doTestRemoveSynch() {
final String cmd0 = "DELETE WHERE { <" + createURI("info://obj-0") + "> ?p ?o }";
final String cmd2 = "DELETE WHERE { <" + createURI("info://obj-0/child") + "> ?p ?o }";
Mockito.verify(updateRequest).add(cmd0);
Expand Down

0 comments on commit 760b3ba

Please sign in to comment.