Skip to content

Commit

Permalink
#630 added synchronisation on futures list to avoid race condition wh…
Browse files Browse the repository at this point in the history
…en checking for active operations

Signed-off-by: Jeen Broekstra <jeen.broekstra@gmail.com>
  • Loading branch information
abrokenjester committed Oct 28, 2016
1 parent 5880ad6 commit 6c9b517
Showing 1 changed file with 143 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ UUID getID() {
void begin(IsolationLevel level)
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
txnConnection.begin(level);
return true;
});
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
txnConnection.begin(level);
return true;
});

futures.add(result);
futures.add(result);
}
result.get();
}

Expand All @@ -127,12 +130,15 @@ void begin(IsolationLevel level)
void rollback()
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
txnConnection.rollback();
return true;
});
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
txnConnection.rollback();
return true;
});

futures.add(result);
futures.add(result);
}
result.get();
}

Expand All @@ -143,13 +149,15 @@ void rollback()
void commit()
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
txnConnection.commit();
return true;
});

futures.add(result);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
txnConnection.commit();
return true;
});

futures.add(result);
}
result.get();
}

Expand All @@ -172,9 +180,11 @@ void commit()
Query prepareQuery(QueryLanguage queryLn, String queryStr, String baseURI)
throws InterruptedException, ExecutionException
{
Future<Query> result = executor.submit(() -> txnConnection.prepareQuery(queryLn, queryStr, baseURI));

futures.add(result);
Future<Query> result;
synchronized (futures) {
result = executor.submit(() -> txnConnection.prepareQuery(queryLn, queryStr, baseURI));
futures.add(result);
}
return result.get();
}

Expand All @@ -192,8 +202,11 @@ Query prepareQuery(QueryLanguage queryLn, String queryStr, String baseURI)
TupleQueryResult evaluate(TupleQuery tQuery)
throws InterruptedException, ExecutionException
{
Future<TupleQueryResult> result = executor.submit(() -> tQuery.evaluate());
futures.add(result);
Future<TupleQueryResult> result;
synchronized (futures) {
result = executor.submit(() -> tQuery.evaluate());
futures.add(result);
}
return result.get();
}

Expand All @@ -211,8 +224,11 @@ TupleQueryResult evaluate(TupleQuery tQuery)
GraphQueryResult evaluate(GraphQuery gQuery)
throws InterruptedException, ExecutionException
{
Future<GraphQueryResult> result = executor.submit(() -> gQuery.evaluate());
futures.add(result);
Future<GraphQueryResult> result;
synchronized (futures) {
result = executor.submit(() -> gQuery.evaluate());
futures.add(result);
}
return result.get();
}

Expand All @@ -230,8 +246,11 @@ GraphQueryResult evaluate(GraphQuery gQuery)
boolean evaluate(BooleanQuery bQuery)
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> bQuery.evaluate());
futures.add(result);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> bQuery.evaluate());
futures.add(result);
}
return result.get();
}

Expand All @@ -249,12 +268,14 @@ void exportStatements(Resource subj, IRI pred, Value obj, boolean useInferencing
Resource... contexts)
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
txnConnection.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts);
return true;
});

futures.add(result);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
txnConnection.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts);
return true;
});
futures.add(result);
}
result.get();
}

Expand All @@ -269,8 +290,11 @@ void exportStatements(Resource subj, IRI pred, Value obj, boolean useInferencing
long getSize(Resource[] contexts)
throws InterruptedException, ExecutionException
{
Future<Long> result = executor.submit(() -> txnConnection.size(contexts));
futures.add(result);
Future<Long> result;
synchronized (futures) {
result = executor.submit(() -> txnConnection.size(contexts));
futures.add(result);
}
return result.get();
}

Expand All @@ -288,32 +312,34 @@ void add(InputStream inputStream, String baseURI, RDFFormat format, boolean pres
Resource... contexts)
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
try {
if (preserveBNodes) {
// create a reconfigured parser + inserter instead of relying on standard
// repositoryconn add method.
RDFParser parser = Rio.createParser(format);
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
RDFInserter inserter = new RDFInserter(txnConnection);
inserter.setPreserveBNodeIDs(true);
if (contexts.length > 0) {
inserter.enforceContext(contexts);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
try {
if (preserveBNodes) {
// create a reconfigured parser + inserter instead of relying on standard
// repositoryconn add method.
RDFParser parser = Rio.createParser(format);
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
RDFInserter inserter = new RDFInserter(txnConnection);
inserter.setPreserveBNodeIDs(true);
if (contexts.length > 0) {
inserter.enforceContext(contexts);
}
parser.setRDFHandler(inserter);
parser.parse(inputStream, baseURI);
}
parser.setRDFHandler(inserter);
parser.parse(inputStream, baseURI);
else {
txnConnection.add(inputStream, baseURI, format, contexts);
}
return true;
}
else {
txnConnection.add(inputStream, baseURI, format, contexts);
catch (IOException e) {
throw new RuntimeException(e);
}
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
}
});

futures.add(result);
});
futures.add(result);
}
result.get();
}

Expand All @@ -327,23 +353,25 @@ void add(InputStream inputStream, String baseURI, RDFFormat format, boolean pres
void delete(RDFFormat contentType, InputStream inputStream, String baseURI)
throws InterruptedException, ExecutionException
{
Future<Boolean> exception = executor.submit(() -> {
RDFParser parser = Rio.createParser(contentType, txnConnection.getValueFactory());

parser.setRDFHandler(new WildcardRDFRemover(txnConnection));
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
try {
parser.parse(inputStream, baseURI);
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
}
});

futures.add(exception);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
RDFParser parser = Rio.createParser(contentType, txnConnection.getValueFactory());

parser.setRDFHandler(new WildcardRDFRemover(txnConnection));
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
try {
parser.parse(inputStream, baseURI);
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
}
});

exception.get();
futures.add(result);
}
result.get();
}

/**
Expand All @@ -360,28 +388,33 @@ void executeUpdate(QueryLanguage queryLn, String sparqlUpdateString, String base
boolean includeInferred, Dataset dataset, Map<String, Value> bindings)
throws InterruptedException, ExecutionException
{
Future<Boolean> result = executor.submit(() -> {
Update update = txnConnection.prepareUpdate(queryLn, sparqlUpdateString);
update.setIncludeInferred(includeInferred);
if (dataset != null) {
update.setDataset(dataset);
}
for (String bindingName : bindings.keySet()) {
update.setBinding(bindingName, bindings.get(bindingName));
}
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
Update update = txnConnection.prepareUpdate(queryLn, sparqlUpdateString);
update.setIncludeInferred(includeInferred);
if (dataset != null) {
update.setDataset(dataset);
}
for (String bindingName : bindings.keySet()) {
update.setBinding(bindingName, bindings.get(bindingName));
}

update.execute();
return true;
});
update.execute();
return true;
});

futures.add(result);
futures.add(result);
}
result.get();
}

boolean hasActiveOperations() {
for (Future future : futures) {
if (!future.isDone()) {
return true;
synchronized (futures) {
for (Future future : futures) {
if (!future.isDone()) {
return true;
}
}
}
return false;
Expand All @@ -398,11 +431,14 @@ void close()
{
if (isClosed.compareAndSet(false, true)) {
try {
Future<Boolean> result = executor.submit(() -> {
txnConnection.close();
return true;
});
futures.add(result);
Future<Boolean> result;
synchronized (futures) {
result = executor.submit(() -> {
txnConnection.close();
return true;
});
futures.add(result);
}
result.get();
}
finally {
Expand All @@ -415,17 +451,20 @@ private RepositoryConnection getTransactionConnection()
throws InterruptedException, ExecutionException
{
// create a new RepositoryConnection with correct parser settings
Future<RepositoryConnection> future = executor.submit(() -> {
RepositoryConnection conn = rep.getConnection();
ParserConfig config = conn.getParserConfig();
config.set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
config.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES);
config.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS);

return conn;
});

futures.add(future);
Future<RepositoryConnection> future;
synchronized (futures) {
future = executor.submit(() -> {
RepositoryConnection conn = rep.getConnection();
ParserConfig config = conn.getParserConfig();
config.set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
config.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES);
config.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS);

return conn;
});

futures.add(future);
}
return future.get();
}

Expand Down

0 comments on commit 6c9b517

Please sign in to comment.