From ad6fec806d100397d316a66a9e95f144f48194fb Mon Sep 17 00:00:00 2001 From: Jeen Broekstra Date: Fri, 28 Oct 2016 11:04:32 +1100 Subject: [PATCH] #630 simplified ActiveTransactionRegistry now keeps track of dedicated txn threads Signed-off-by: Jeen Broekstra --- .../server/repository/QueryResultView.java | 13 +- .../statements/ExportStatementsView.java | 83 ++- .../ActiveTransactionRegistry.java | 190 ++----- .../repository/transaction/Transaction.java | 475 ++++++++++++++++++ .../transaction/TransactionController.java | 300 ++++------- .../TransactionExportStatementsView.java | 118 +++++ .../TransactionStartController.java | 18 +- .../TestActiveTransactionRegistry.java | 117 +---- 8 files changed, 792 insertions(+), 522 deletions(-) create mode 100644 core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/Transaction.java create mode 100644 core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionExportStatementsView.java diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/QueryResultView.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/QueryResultView.java index f058a3b577f..dba09d60ebd 100644 --- a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/QueryResultView.java +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/QueryResultView.java @@ -12,13 +12,11 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.Map; -import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.rdf4j.common.lang.FileFormat; -import org.eclipse.rdf4j.http.server.repository.transaction.ActiveTransactionRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.servlet.View; @@ -63,16 +61,7 @@ public abstract class QueryResultView implements View { public final void render(Map model, HttpServletRequest request, HttpServletResponse response) throws IOException { - UUID txnId = null; - try { - txnId = (UUID)model.get(TRANSACTION_ID_KEY); - renderInternal(model, request, response); - } - finally { - if (txnId != null) { - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(txnId); - } - } + renderInternal(model, request, response); } @SuppressWarnings("rawtypes") diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/statements/ExportStatementsView.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/statements/ExportStatementsView.java index 61023ba52b6..d33c25f2334 100644 --- a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/statements/ExportStatementsView.java +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/statements/ExportStatementsView.java @@ -12,14 +12,12 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.util.Map; -import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.rdf4j.http.server.ServerHTTPException; import org.eclipse.rdf4j.http.server.repository.RepositoryInterceptor; -import org.eclipse.rdf4j.http.server.repository.transaction.ActiveTransactionRegistry; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.Value; @@ -74,62 +72,53 @@ public String getContentType() { public void render(Map model, HttpServletRequest request, HttpServletResponse response) throws Exception { - UUID txnId = null; - try { - txnId = (UUID)model.get(TRANSACTION_ID_KEY); - Resource subj = (Resource)model.get(SUBJECT_KEY); - IRI pred = (IRI)model.get(PREDICATE_KEY); - Value obj = (Value)model.get(OBJECT_KEY); - Resource[] contexts = (Resource[])model.get(CONTEXTS_KEY); - boolean useInferencing = (Boolean)model.get(USE_INFERENCING_KEY); - RepositoryConnection conn = (RepositoryConnection)model.get(CONNECTION_KEY); + Resource subj = (Resource)model.get(SUBJECT_KEY); + IRI pred = (IRI)model.get(PREDICATE_KEY); + Value obj = (Value)model.get(OBJECT_KEY); + Resource[] contexts = (Resource[])model.get(CONTEXTS_KEY); + boolean useInferencing = (Boolean)model.get(USE_INFERENCING_KEY); + RepositoryConnection conn = (RepositoryConnection)model.get(CONNECTION_KEY); - boolean headersOnly = (Boolean)model.get(HEADERS_ONLY); + boolean headersOnly = (Boolean)model.get(HEADERS_ONLY); - RDFWriterFactory rdfWriterFactory = (RDFWriterFactory)model.get(FACTORY_KEY); + RDFWriterFactory rdfWriterFactory = (RDFWriterFactory)model.get(FACTORY_KEY); - RDFFormat rdfFormat = rdfWriterFactory.getRDFFormat(); + RDFFormat rdfFormat = rdfWriterFactory.getRDFFormat(); - try { - OutputStream out = response.getOutputStream(); - RDFWriter rdfWriter = rdfWriterFactory.getWriter(out); + try { + OutputStream out = response.getOutputStream(); + RDFWriter rdfWriter = rdfWriterFactory.getWriter(out); - response.setStatus(SC_OK); + response.setStatus(SC_OK); - String mimeType = rdfFormat.getDefaultMIMEType(); - if (rdfFormat.hasCharset()) { - Charset charset = rdfFormat.getCharset(); - mimeType += "; charset=" + charset.name(); - } - response.setContentType(mimeType); + String mimeType = rdfFormat.getDefaultMIMEType(); + if (rdfFormat.hasCharset()) { + Charset charset = rdfFormat.getCharset(); + mimeType += "; charset=" + charset.name(); + } + response.setContentType(mimeType); + + String filename = "statements"; + if (rdfFormat.getDefaultFileExtension() != null) { + filename += "." + rdfFormat.getDefaultFileExtension(); + } + response.setHeader("Content-Disposition", "attachment; filename=" + filename); - String filename = "statements"; - if (rdfFormat.getDefaultFileExtension() != null) { - filename += "." + rdfFormat.getDefaultFileExtension(); + if (!headersOnly) { + if (conn == null) { + conn = RepositoryInterceptor.getRepositoryConnection(request); } - response.setHeader("Content-Disposition", "attachment; filename=" + filename); - - if (!headersOnly) { - if (conn == null) { - conn = RepositoryInterceptor.getRepositoryConnection(request); - } - synchronized (conn) { - conn.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts); - } + synchronized (conn) { + conn.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts); } - out.close(); - } - catch (RDFHandlerException e) { - throw new ServerHTTPException("Serialization error: " + e.getMessage(), e); - } - catch (RepositoryException e) { - throw new ServerHTTPException("Repository error: " + e.getMessage(), e); } + out.close(); } - finally { - if (txnId != null) { - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(txnId); - } + catch (RDFHandlerException e) { + throw new ServerHTTPException("Serialization error: " + e.getMessage(), e); + } + catch (RepositoryException e) { + throw new ServerHTTPException("Repository error: " + e.getMessage(), e); } } diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/ActiveTransactionRegistry.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/ActiveTransactionRegistry.java index 6f2bf20296e..a93f1ce87ce 100644 --- a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/ActiveTransactionRegistry.java +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/ActiveTransactionRegistry.java @@ -1,20 +1,17 @@ -/******************************************************************************* - * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others. +/** + * Copyright (c) 2016 Eclipse RDF4J contributors. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Distribution License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/org/documents/edl-v10.php. - *******************************************************************************/ + */ package org.eclipse.rdf4j.http.server.repository.transaction; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.RepositoryException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,16 +23,12 @@ import com.google.common.cache.RemovalNotification; /** - * Registry keeping track of active transactions identified by a {@link UUID} and the - * {@link RepositoryConnection} that corresponds to the given transaction. + * Registry keeping track of active transactions identified by a {@link UUID}. * * @author Jeen Broekstra */ public enum ActiveTransactionRegistry { - /** - * Singleton instance - */ INSTANCE; private final Logger logger = LoggerFactory.getLogger(ActiveTransactionRegistry.class); @@ -55,43 +48,17 @@ public enum ActiveTransactionRegistry { * primary cache for transactions, accessible via transaction ID. Cache entries are kept until a * transaction signals it has ended, or until the secondary cache finds an "orphaned" transaction entry. */ - private final Cache primaryCache; + private final Cache primaryCache; /** * The secondary cache does automatic cleanup of its entries based on the configured timeout. If an - * expired entry is no longer locked by any thread, it is considered "orphaned" and discarded from the - * primary cache. + * expired transaction is no longer active, it is considered "orphaned" and discarded from the primary + * cache. */ - private final Cache secondaryCache; - - static class CacheEntry { - - private final RepositoryConnection connection; - - private final ReentrantLock lock = new ReentrantLock(); - - public CacheEntry(RepositoryConnection connection) { - this.connection = connection; - } - - /** - * @return Returns the connection. - */ - public RepositoryConnection getConnection() { - return connection; - } - - /** - * @return Returns the lock. - */ - public ReentrantLock getLock() { - return lock; - } - - } + private final Cache secondaryCache; /** - * private constructor. Access via {@link ActiveTransactionRegistry#INSTANCE} + * private constructor. */ private ActiveTransactionRegistry() { int timeout = DEFAULT_TIMEOUT; @@ -107,29 +74,29 @@ private ActiveTransactionRegistry() { } } - primaryCache = CacheBuilder.newBuilder().removalListener(new RemovalListener() { + primaryCache = CacheBuilder.newBuilder().removalListener(new RemovalListener() { @Override - public void onRemoval(RemovalNotification notification) { - CacheEntry entry = notification.getValue(); + public void onRemoval(RemovalNotification notification) { + Transaction entry = notification.getValue(); try { - entry.getConnection().close(); + entry.close(); } - catch (RepositoryException e) { + catch (RepositoryException | InterruptedException | ExecutionException e) { // fall through } } }).build(); - secondaryCache = CacheBuilder.newBuilder().removalListener(new RemovalListener() { + secondaryCache = CacheBuilder.newBuilder().removalListener(new RemovalListener() { @Override - public void onRemoval(RemovalNotification notification) { + public void onRemoval(RemovalNotification notification) { if (RemovalCause.EXPIRED.equals(notification.getCause())) { final UUID transactionId = notification.getKey(); - final CacheEntry entry = notification.getValue(); + final Transaction entry = notification.getValue(); synchronized (primaryCache) { - if (!entry.getLock().isLocked()) { + if (!entry.hasActiveOperations()) { // no operation active, we can decommission this entry primaryCache.invalidate(transactionId); logger.warn("deregistered expired transaction {}", transactionId); @@ -146,110 +113,51 @@ public void onRemoval(RemovalNotification notification) { } /** - * Register a new transaction with the given id and connection. - * - * @param transactionId - * the transaction id - * @param conn - * the {@link RepositoryConnection} to use for handling the transaction. - * @throws RepositoryException - * if a transaction is already registered with the given transaction id. + * @param txnId + * @param txn */ - public void register(UUID transactionId, RepositoryConnection conn) - throws RepositoryException - { + public void register(Transaction txn) { synchronized (primaryCache) { - if (primaryCache.getIfPresent(transactionId) == null) { - final CacheEntry cacheEntry = new CacheEntry(conn); - primaryCache.put(transactionId, cacheEntry); - secondaryCache.put(transactionId, cacheEntry); - logger.debug("registered transaction {} ", transactionId); + Transaction existingTxn = primaryCache.getIfPresent(txn.getID()); + if (existingTxn == null) { + primaryCache.put(txn.getID(), txn); + secondaryCache.put(txn.getID(), txn); + logger.debug("registered transaction {} ", txn.getID()); } else { - logger.error("transaction already registered: {}", transactionId); + logger.error("transaction already registered: {}", txn.getID()); throw new RepositoryException( - "transaction with id " + transactionId.toString() + " already registered."); + "transaction with id " + txn.getID().toString() + " already registered."); } } } - /** - * Remove the given transaction from the registry - * - * @param transactionId - * the transaction id - * @throws RepositoryException - * if no registered transaction with the given id could be found. - */ - public void deregister(UUID transactionId) - throws RepositoryException - { + public Transaction getTransaction(UUID id) { synchronized (primaryCache) { - CacheEntry entry = primaryCache.getIfPresent(transactionId); + Transaction entry = primaryCache.getIfPresent(id); if (entry == null) { - throw new RepositoryException( - "transaction with id " + transactionId.toString() + " not registered."); - } - else { - primaryCache.invalidate(transactionId); - secondaryCache.invalidate(transactionId); - logger.debug("deregistered transaction {}", transactionId); + throw new RepositoryException("transaction with id " + id.toString() + " not registered."); } + updateSecondaryCache(entry); + return entry; } } /** - * Obtain the {@link RepositoryConnection} associated with the given transaction. This method will block - * if another thread currently has access to the connection. - * - * @param transactionId - * a transaction ID - * @return the RepositoryConnection belonging to this transaction. - * @throws RepositoryException - * if no transaction with the given id is registered. - * @throws InterruptedException - * if the thread is interrupted while acquiring a lock on the transaction. + * @param transaction */ - public RepositoryConnection getTransactionConnection(UUID transactionId) - throws RepositoryException, InterruptedException - { - Lock txnLock = null; + public void deregister(Transaction transaction) { + synchronized (primaryCache) { - CacheEntry entry = primaryCache.getIfPresent(transactionId); + Transaction entry = primaryCache.getIfPresent(transaction.getID()); if (entry == null) { throw new RepositoryException( - "transaction with id " + transactionId.toString() + " not registered."); + "transaction with id " + transaction.getID().toString() + " not registered."); } - - txnLock = entry.getLock(); - } - - txnLock.lockInterruptibly(); - /* Another thread might have deregistered the transaction while we were acquiring the lock */ - final CacheEntry entry = primaryCache.getIfPresent(transactionId); - if (entry == null) { - throw new RepositoryException( - "transaction with id " + transactionId + " is no longer registered!"); - } - updateSecondaryCache(transactionId, entry); - - return entry.getConnection(); - } - - /** - * Unlocks the {@link RepositoryConnection} associated with the given transaction for use by other - * threads. If the transaction is no longer registered, this will method will exit silently. - * - * @param transactionId - * a transaction identifier. - */ - public void returnTransactionConnection(UUID transactionId) { - final CacheEntry entry = primaryCache.getIfPresent(transactionId); - if (entry != null) { - updateSecondaryCache(transactionId, entry); - final ReentrantLock txnLock = entry.getLock(); - if (txnLock.isHeldByCurrentThread()) { - txnLock.unlock(); + else { + primaryCache.invalidate(transaction.getID()); + secondaryCache.invalidate(transaction.getID()); + logger.debug("deregistered transaction {}", transaction.getID()); } } } @@ -258,20 +166,18 @@ public void returnTransactionConnection(UUID transactionId) { * Checks if the given transaction entry is still in the secondary cache (resetting its last access time * in the process) and if not reinserts it. * - * @param transactionId - * the id for the transaction to check - * @param entry - * the cache entry to insert if necessary. + * @param transaction + * the transaction to check */ - private void updateSecondaryCache(UUID transactionId, final CacheEntry entry) { + private void updateSecondaryCache(final Transaction transaction) { try { - secondaryCache.get(transactionId, new Callable() { + secondaryCache.get(transaction.getID(), new Callable() { @Override - public CacheEntry call() + public Transaction call() throws Exception { - return entry; + return transaction; } }); } diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/Transaction.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/Transaction.java new file mode 100644 index 00000000000..85a80bf4ef6 --- /dev/null +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/Transaction.java @@ -0,0 +1,475 @@ +/** + * Copyright (c) 2016 Eclipse RDF4J contributors. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + */ +package org.eclipse.rdf4j.http.server.repository.transaction; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.eclipse.rdf4j.IsolationLevel; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.vocabulary.SESAME; +import org.eclipse.rdf4j.query.BooleanQuery; +import org.eclipse.rdf4j.query.Dataset; +import org.eclipse.rdf4j.query.GraphQuery; +import org.eclipse.rdf4j.query.GraphQueryResult; +import org.eclipse.rdf4j.query.Query; +import org.eclipse.rdf4j.query.QueryLanguage; +import org.eclipse.rdf4j.query.TupleQuery; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.query.Update; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryException; +import org.eclipse.rdf4j.repository.util.RDFInserter; +import org.eclipse.rdf4j.rio.ParserConfig; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.eclipse.rdf4j.rio.helpers.BasicParserSettings; + +/** + * A transaction encapsulates a single {@link Thread} and a {@link RepositoryConnection}, to enable executing + * all operations that are part of the transaction from a single, dedicated thread. This is necessary because + * {@link RepositoryConnection} is not guaranteed thread-safe and we may run into concurrency issues if we + * attempt to share it between the various HTTP Request worker threads. + * + * @author Jeen Broekstra + */ +class Transaction { + + private final UUID id; + + private final Repository rep; + + private final RepositoryConnection txnConnection; + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + private final List> futures = new ArrayList<>(); + + /** + * Create a new Transaction for the given {@link Repository}. + * + * @param repository + * the {@link Repository} on which to open a transaction. + * @throws InterruptedException + * if the transaction thread is interrupted while opening a connection. + * @throws ExecutionException + * if an error occurs while opening the connection. + */ + Transaction(Repository repository) + throws InterruptedException, ExecutionException + { + this.id = UUID.randomUUID(); + this.rep = repository; + this.txnConnection = getTransactionConnection(); + } + + /** + * The identifier of this transaction object. + * + * @return a {@link UUID} that identifies this Transaction. + */ + UUID getID() { + return id; + } + + + + /** + * Start the transaction. + * + * @param level + * the {@link IsolationLevel} to use for this transction. + * @throws InterruptedException + * if the transaction thread is interrupted + * @throws ExecutionException + * if an error occurs while starting the transaction. + */ + void begin(IsolationLevel level) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + txnConnection.begin(level); + return true; + }); + + futures.add(result); + result.get(); + } + + /** + * Rolls back all updates in the transaction. + * + * @throws ExecutionException + * @throws InterruptedException + */ + void rollback() + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + txnConnection.rollback(); + return true; + }); + + futures.add(result); + result.get(); + } + + /** + * @throws ExecutionException + * @throws InterruptedException + */ + void commit() + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + txnConnection.commit(); + return true; + }); + + futures.add(result); + + result.get(); + } + + /** + * Prepares a query for evaluation on this transaction. + * + * @param ql + * The {@link QueryLanguage query language} in which the query is formulated. + * @param query + * The query string. + * @param baseURI + * The base URI to resolve any relative URIs that are in the query against, can be null if + * the query does not contain any relative URIs. + * @return A query ready to be evaluated on this repository. + * @throws InterruptedException + * if the transaction thread is interrupted + * @throws ExecutionException + * if an error occurs while executing the operation. + */ + Query prepareQuery(QueryLanguage queryLn, String queryStr, String baseURI) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> txnConnection.prepareQuery(queryLn, queryStr, baseURI)); + + futures.add(result); + return result.get(); + } + + /** + * Evaluate a TupleQuery in this transaction and return the result. + * + * @param tQuery + * a {@link TupleQuery} prepared on this transaction. + * @return a {@link TupleQueryResult} + * @throws InterruptedException + * if the transaction thread is interrupted + * @throws ExecutionException + * if an error occurs while executing the operation. + */ + TupleQueryResult evaluate(TupleQuery tQuery) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> tQuery.evaluate()); + futures.add(result); + return result.get(); + } + + /** + * Evaluate a {@link GraphQuery} in this transaction and return the result. + * + * @param gQuery + * a {@link GraphQuery} prepared on this transaction. + * @return a {@link GraphQueryResult} + * @throws InterruptedException + * if the transaction thread is interrupted + * @throws ExecutionException + * if an error occurs while executing the operation. + */ + GraphQueryResult evaluate(GraphQuery gQuery) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> gQuery.evaluate()); + futures.add(result); + return result.get(); + } + + /** + * Evaluate a {@link BooleanQuery} in this transaction and return the result. + * + * @param bQuery + * a {@link BooleanQuery} prepared on this transaction. + * @return the query result as a boolean + * @throws InterruptedException + * if the transaction thread is interrupted + * @throws ExecutionException + * if an error occurs while executing the operation. + */ + boolean evaluate(BooleanQuery bQuery) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> bQuery.evaluate()); + futures.add(result); + return result.get(); + } + + /** + * @param subj + * @param pred + * @param obj + * @param useInferencing + * @param rdfWriter + * @param contexts + * @throws ExecutionException + * @throws InterruptedException + */ + void exportStatements(Resource subj, IRI pred, Value obj, boolean useInferencing, + RDFWriter rdfWriter, Resource... contexts) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + txnConnection.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts); + return true; + }); + + futures.add(result); + result.get(); + } + + /** + * Returns the number of (explicit) statements that are in the specified contexts in this transaction. + * + * @param contexts + * The context(s) to get the data from. Note that this parameter is a vararg and as such is + * optional. If no contexts are supplied the method operates on the entire repository. + * @return The number of explicit statements from the specified contexts in this transaction. + */ + long getSize(Resource[] contexts) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> txnConnection.size(contexts)); + futures.add(result); + return result.get(); + } + + /** + * Adds RDF data from an {@link InputStream} to the transaction. + * + * @param inputStream + * @param baseURI + * @param format + * @param contexts + * @throws ExecutionException + * @throws InterruptedException + */ + void add(InputStream inputStream, String baseURI, RDFFormat format, boolean preserveBNodes, + Resource... contexts) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + try { + if (preserveBNodes) { + // create a reconfigured parser + inserter instead of relying on standard + // repositoryconn add method. + RDFParser parser = Rio.createParser(format); + parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true); + RDFInserter inserter = new RDFInserter(txnConnection); + inserter.setPreserveBNodeIDs(true); + if (contexts.length > 0) { + inserter.enforceContext(contexts); + } + parser.setRDFHandler(inserter); + parser.parse(inputStream, baseURI); + } + else { + txnConnection.add(inputStream, baseURI, format, contexts); + } + return true; + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + + futures.add(result); + result.get(); + } + + /** + * @param contentType + * @param inputStream + * @param baseURI + * @throws ExecutionException + * @throws InterruptedException + */ + void delete(RDFFormat contentType, InputStream inputStream, String baseURI) + throws InterruptedException, ExecutionException + { + Future exception = executor.submit(() -> { + RDFParser parser = Rio.createParser(contentType, txnConnection.getValueFactory()); + + parser.setRDFHandler(new WildcardRDFRemover(txnConnection)); + parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true); + try { + parser.parse(inputStream, baseURI); + return true; + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + + futures.add(exception); + + exception.get(); + } + + /** + * @param queryLn + * @param sparqlUpdateString + * @param baseURI + * @param includeInferred + * @param dataset + * @param bindings + * @throws ExecutionException + * @throws InterruptedException + */ + void executeUpdate(QueryLanguage queryLn, String sparqlUpdateString, String baseURI, + boolean includeInferred, Dataset dataset, Map bindings) + throws InterruptedException, ExecutionException + { + Future result = executor.submit(() -> { + Update update = txnConnection.prepareUpdate(queryLn, sparqlUpdateString); + update.setIncludeInferred(includeInferred); + if (dataset != null) { + update.setDataset(dataset); + } + for (String bindingName : bindings.keySet()) { + update.setBinding(bindingName, bindings.get(bindingName)); + } + + update.execute(); + return true; + }); + + futures.add(result); + result.get(); + } + + boolean hasActiveOperations() { + for (Future future : futures) { + if (!future.isDone()) { + return true; + } + } + return false; + } + + /** + * Close this transaction. + * + * @throws InterruptedException + * @throws ExecutionException + */ + void close() + throws InterruptedException, ExecutionException + { + try { + Future result = executor.submit(() -> { + txnConnection.close(); + return true; + }); + + futures.add(result); + result.get(); + } + finally { + executor.shutdown(); + } + } + + private RepositoryConnection getTransactionConnection() + throws InterruptedException, ExecutionException + { + // create a new RepositoryConnection with correct parser settings + Future future = executor.submit(() -> { + RepositoryConnection conn = rep.getConnection(); + ParserConfig config = conn.getParserConfig(); + config.set(BasicParserSettings.PRESERVE_BNODE_IDS, true); + config.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES); + config.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS); + + return conn; + }); + + futures.add(future); + return future.get(); + } + + private static class WildcardRDFRemover extends AbstractRDFHandler { + + private final RepositoryConnection conn; + + public WildcardRDFRemover(RepositoryConnection conn) { + super(); + this.conn = conn; + } + + @Override + public void handleStatement(Statement st) + throws RDFHandlerException + { + Resource subject = SESAME.WILDCARD.equals(st.getSubject()) ? null : st.getSubject(); + IRI predicate = SESAME.WILDCARD.equals(st.getPredicate()) ? null : st.getPredicate(); + Value object = SESAME.WILDCARD.equals(st.getObject()) ? null : st.getObject(); + + // use the RepositoryConnection.clear operation if we're removing all statements + final boolean clearAllTriples = subject == null && predicate == null && object == null; + + try { + Resource context = st.getContext(); + if (context != null) { + if (clearAllTriples) { + conn.clear(context); + } + else { + conn.remove(subject, predicate, object, context); + } + } + else { + if (clearAllTriples) { + conn.clear(); + } + else { + conn.remove(subject, predicate, object); + } + } + } + catch (RepositoryException e) { + throw new RDFHandlerException(e); + } + } + + } +} diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionController.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionController.java index b8f1c9792d9..959b9683255 100644 --- a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionController.java +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionController.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -52,13 +53,11 @@ import org.eclipse.rdf4j.http.server.repository.QueryResultView; import org.eclipse.rdf4j.http.server.repository.RepositoryInterceptor; import org.eclipse.rdf4j.http.server.repository.TupleQueryResultView; -import org.eclipse.rdf4j.http.server.repository.statements.ExportStatementsView; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; -import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.ValueFactory; -import org.eclipse.rdf4j.model.vocabulary.SESAME; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.eclipse.rdf4j.query.BooleanQuery; import org.eclipse.rdf4j.query.GraphQuery; import org.eclipse.rdf4j.query.MalformedQueryException; @@ -68,23 +67,16 @@ import org.eclipse.rdf4j.query.QueryLanguage; import org.eclipse.rdf4j.query.TupleQuery; import org.eclipse.rdf4j.query.UnsupportedQueryLanguageException; -import org.eclipse.rdf4j.query.Update; import org.eclipse.rdf4j.query.UpdateExecutionException; import org.eclipse.rdf4j.query.impl.SimpleDataset; import org.eclipse.rdf4j.query.resultio.BooleanQueryResultWriterRegistry; import org.eclipse.rdf4j.query.resultio.TupleQueryResultWriterRegistry; import org.eclipse.rdf4j.repository.Repository; -import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.RepositoryException; -import org.eclipse.rdf4j.repository.util.RDFInserter; import org.eclipse.rdf4j.rio.RDFFormat; -import org.eclipse.rdf4j.rio.RDFHandlerException; -import org.eclipse.rdf4j.rio.RDFParser; import org.eclipse.rdf4j.rio.RDFWriterFactory; import org.eclipse.rdf4j.rio.RDFWriterRegistry; import org.eclipse.rdf4j.rio.Rio; -import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; -import org.eclipse.rdf4j.rio.helpers.BasicParserSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContextException; @@ -117,13 +109,13 @@ protected ModelAndView handleRequestInternal(HttpServletRequest request, HttpSer UUID transactionId = getTransactionID(request); logger.debug("transaction id: {}", transactionId); logger.debug("request content type: {}", request.getContentType()); - RepositoryConnection connection = ActiveTransactionRegistry.INSTANCE.getTransactionConnection( - transactionId); - if (connection == null) { - logger.warn("could not find connection for transaction id {}", transactionId); + Transaction transaction = ActiveTransactionRegistry.INSTANCE.getTransaction(transactionId); + + if (transaction == null) { + logger.warn("could not find transaction for transaction id {}", transactionId); throw new ClientHTTPException(SC_BAD_REQUEST, - "unable to find registerd connection for transaction id '" + transactionId + "'"); + "unable to find registerd transaction for transaction id '" + transactionId + "'"); } // if no action is specified in the request, it's a rollback (since it's @@ -138,7 +130,7 @@ protected ModelAndView handleRequestInternal(HttpServletRequest request, HttpSer // PUT is allowed. if ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)) { logger.info("{} txn query request", reqMethod); - result = processQuery(connection, transactionId, request, response); + result = processQuery(transaction, request, response); logger.info("{} txn query request finished", reqMethod); } else { @@ -149,7 +141,7 @@ protected ModelAndView handleRequestInternal(HttpServletRequest request, HttpSer case GET: if ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)) { logger.info("{} txn get/export statements request", reqMethod); - result = getExportStatementsResult(connection, transactionId, request, response); + result = getExportStatementsResult(transaction, request, response); logger.info("{} txn get/export statements request finished", reqMethod); } else { @@ -160,7 +152,7 @@ protected ModelAndView handleRequestInternal(HttpServletRequest request, HttpSer case SIZE: if ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)) { logger.info("{} txn size request", reqMethod); - result = getSize(connection, transactionId, request, response); + result = getSize(transaction, request, response); logger.info("{} txn size request finished", reqMethod); } else { @@ -169,38 +161,31 @@ protected ModelAndView handleRequestInternal(HttpServletRequest request, HttpSer } break; default: - // modification operations - we can process these and then - // immediately release the connection back to the registry. - try { - // TODO Action.ROLLBACK check is for backward compatibility with - // older 2.8.x releases only. It's not in the protocol spec. - if ("DELETE".equals(reqMethod) || (action.equals(Action.ROLLBACK) - && ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)))) - { - logger.info("transaction rollback"); - try { - connection.rollback(); - } - finally { - ActiveTransactionRegistry.INSTANCE.deregister(transactionId); - connection.close(); - } - result = new ModelAndView(EmptySuccessView.getInstance()); - logger.info("transaction rollback request finished."); - } - else if ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)) { - // TODO filter for appropriate PUT operations - logger.info("{} txn operation", reqMethod); - result = processModificationOperation(connection, action, request, response); - logger.info("PUT txn operation request finished."); + // TODO Action.ROLLBACK check is for backward compatibility with + // older 2.8.x releases only. It's not in the protocol spec. + if ("DELETE".equals(reqMethod) || (action.equals(Action.ROLLBACK) + && ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)))) + { + logger.info("transaction rollback"); + try { + transaction.rollback(); } - else { - throw new ClientHTTPException(HttpServletResponse.SC_METHOD_NOT_ALLOWED, - "Method not allowed: " + reqMethod); + finally { + ActiveTransactionRegistry.INSTANCE.deregister(transaction); + transaction.close(); } + result = new ModelAndView(EmptySuccessView.getInstance()); + logger.info("transaction rollback request finished."); } - finally { - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(transactionId); + else if ("PUT".equals(reqMethod) || METHOD_POST.equals(reqMethod)) { + // TODO filter for appropriate PUT operations + logger.info("{} txn operation", reqMethod); + result = processModificationOperation(transaction, action, request, response); + logger.info("PUT txn operation request finished."); + } + else { + throw new ClientHTTPException(HttpServletResponse.SC_METHOD_NOT_ALLOWED, + "Method not allowed: " + reqMethod); } break; } @@ -235,7 +220,7 @@ private UUID getTransactionID(HttpServletRequest request) return txnID; } - private ModelAndView processModificationOperation(RepositoryConnection conn, Action action, + private ModelAndView processModificationOperation(Transaction transaction, Action action, HttpServletRequest request, HttpServletResponse response) throws IOException, HTTPException { @@ -249,50 +234,31 @@ private ModelAndView processModificationOperation(RepositoryConnection conn, Act } final Resource[] contexts = ProtocolUtil.parseContextParam(request, CONTEXT_PARAM_NAME, - conn.getValueFactory()); + SimpleValueFactory.getInstance()); final boolean preserveNodeIds = ProtocolUtil.parseBooleanParam(request, Protocol.PRESERVE_BNODE_ID_PARAM_NAME, false); try { + RDFFormat format = null; switch (action) { case ADD: - final RDFFormat format = Rio.getParserFormatForMIMEType( - request.getContentType()).orElseThrow( - Rio.unsupportedFormat(request.getContentType())); - - if (preserveNodeIds) { - // create a reconfigured parser + inserter instead of relying on standard - // repositoryconn add method. - RDFParser parser = Rio.createParser(format); - parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true); - RDFInserter inserter = new RDFInserter(conn); - inserter.setPreserveBNodeIDs(true); - if (contexts.length > 0) { - inserter.enforceContext(contexts); - } - parser.setRDFHandler(inserter); - parser.parse(request.getInputStream(), baseURI); - } - else { - conn.add(request.getInputStream(), baseURI, format, contexts); - } + format = Rio.getParserFormatForMIMEType(request.getContentType()).orElseThrow( + Rio.unsupportedFormat(request.getContentType())); + transaction.add(request.getInputStream(), baseURI, format, preserveNodeIds, contexts); break; case DELETE: - RDFParser parser = Rio.createParser( - Rio.getParserFormatForMIMEType(request.getContentType()).orElseThrow( - Rio.unsupportedFormat(request.getContentType())), - conn.getValueFactory()); - parser.setRDFHandler(new WildcardRDFRemover(conn)); - parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true); - parser.parse(request.getInputStream(), baseURI); + format = Rio.getParserFormatForMIMEType(request.getContentType()).orElseThrow( + Rio.unsupportedFormat(request.getContentType())); + transaction.delete(format, request.getInputStream(), baseURI); + break; case UPDATE: - return getSparqlUpdateResult(conn, request, response); + return getSparqlUpdateResult(transaction, request, response); case COMMIT: - conn.commit(); - conn.close(); - ActiveTransactionRegistry.INSTANCE.deregister(getTransactionID(request)); + transaction.commit(); + transaction.close(); + ActiveTransactionRegistry.INSTANCE.deregister(transaction); break; default: logger.warn("transaction modification action '{}' not recognized", action); @@ -313,39 +279,33 @@ private ModelAndView processModificationOperation(RepositoryConnection conn, Act } } - private ModelAndView getSize(RepositoryConnection conn, UUID txnId, HttpServletRequest request, + private ModelAndView getSize(Transaction transaction, HttpServletRequest request, HttpServletResponse response) throws HTTPException { - try { - ProtocolUtil.logRequestParameters(request); + ProtocolUtil.logRequestParameters(request); - Map model = new HashMap(); - final boolean headersOnly = METHOD_HEAD.equals(request.getMethod()); + Map model = new HashMap(); + final boolean headersOnly = METHOD_HEAD.equals(request.getMethod()); - if (!headersOnly) { - Repository repository = RepositoryInterceptor.getRepository(request); + if (!headersOnly) { + Repository repository = RepositoryInterceptor.getRepository(request); - ValueFactory vf = repository.getValueFactory(); - Resource[] contexts = ProtocolUtil.parseContextParam(request, Protocol.CONTEXT_PARAM_NAME, - vf); + ValueFactory vf = repository.getValueFactory(); + Resource[] contexts = ProtocolUtil.parseContextParam(request, Protocol.CONTEXT_PARAM_NAME, vf); - long size = -1; + long size = -1; - try { - size = conn.size(contexts); - } - catch (RepositoryException e) { - throw new ServerHTTPException("Repository error: " + e.getMessage(), e); - } - model.put(SimpleResponseView.CONTENT_KEY, String.valueOf(size)); + try { + size = transaction.getSize(contexts); } - - return new ModelAndView(SimpleResponseView.getInstance(), model); - } - finally { - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(txnId); + catch (RepositoryException | InterruptedException | ExecutionException e) { + throw new ServerHTTPException("Repository error: " + e.getMessage(), e); + } + model.put(SimpleResponseView.CONTENT_KEY, String.valueOf(size)); } + + return new ModelAndView(SimpleResponseView.getInstance(), model); } /** @@ -353,13 +313,13 @@ private ModelAndView getSize(RepositoryConnection conn, UUID txnId, HttpServletR * * @return a model and view for exporting the statements. */ - private ModelAndView getExportStatementsResult(RepositoryConnection conn, UUID txnId, - HttpServletRequest request, HttpServletResponse response) + private ModelAndView getExportStatementsResult(Transaction transaction, HttpServletRequest request, + HttpServletResponse response) throws ClientHTTPException { ProtocolUtil.logRequestParameters(request); - ValueFactory vf = conn.getValueFactory(); + ValueFactory vf = SimpleValueFactory.getInstance(); Resource subj = ProtocolUtil.parseResourceParam(request, SUBJECT_PARAM_NAME, vf); IRI pred = ProtocolUtil.parseURIParam(request, PREDICATE_PARAM_NAME, vf); @@ -371,16 +331,16 @@ private ModelAndView getExportStatementsResult(RepositoryConnection conn, UUID t RDFWriterRegistry.getInstance()); Map model = new HashMap(); - model.put(ExportStatementsView.SUBJECT_KEY, subj); - model.put(ExportStatementsView.PREDICATE_KEY, pred); - model.put(ExportStatementsView.OBJECT_KEY, obj); - model.put(ExportStatementsView.CONTEXTS_KEY, contexts); - model.put(ExportStatementsView.USE_INFERENCING_KEY, Boolean.valueOf(useInferencing)); - model.put(ExportStatementsView.FACTORY_KEY, rdfWriterFactory); - model.put(ExportStatementsView.HEADERS_ONLY, METHOD_HEAD.equals(request.getMethod())); - model.put(ExportStatementsView.CONNECTION_KEY, conn); - model.put(ExportStatementsView.TRANSACTION_ID_KEY, txnId); - return new ModelAndView(ExportStatementsView.getInstance(), model); + model.put(TransactionExportStatementsView.SUBJECT_KEY, subj); + model.put(TransactionExportStatementsView.PREDICATE_KEY, pred); + model.put(TransactionExportStatementsView.OBJECT_KEY, obj); + model.put(TransactionExportStatementsView.CONTEXTS_KEY, contexts); + model.put(TransactionExportStatementsView.USE_INFERENCING_KEY, Boolean.valueOf(useInferencing)); + model.put(TransactionExportStatementsView.FACTORY_KEY, rdfWriterFactory); + model.put(TransactionExportStatementsView.HEADERS_ONLY, METHOD_HEAD.equals(request.getMethod())); + + model.put(TransactionExportStatementsView.TRANSACTION_KEY, transaction); + return new ModelAndView(TransactionExportStatementsView.getInstance(), model); } /** @@ -388,7 +348,7 @@ private ModelAndView getExportStatementsResult(RepositoryConnection conn, UUID t * {@link QueryResultView} will take care of correctly releasing the connection back to the * {@link ActiveTransactionRegistry}, after fully rendering the query result for sending over the wire. */ - private ModelAndView processQuery(RepositoryConnection conn, UUID txnId, HttpServletRequest request, + private ModelAndView processQuery(Transaction txn, HttpServletRequest request, HttpServletResponse response) throws IOException, HTTPException { @@ -403,31 +363,31 @@ private ModelAndView processQuery(RepositoryConnection conn, UUID txnId, HttpSer queryStr = request.getParameter(QUERY_PARAM_NAME); } - Query query = getQuery(conn, queryStr, request, response); - View view; Object queryResult; FileFormatServiceRegistry registry; try { + Query query = getQuery(txn, queryStr, request, response); + if (query instanceof TupleQuery) { TupleQuery tQuery = (TupleQuery)query; - queryResult = tQuery.evaluate(); + queryResult = txn.evaluate(tQuery); registry = TupleQueryResultWriterRegistry.getInstance(); view = TupleQueryResultView.getInstance(); } else if (query instanceof GraphQuery) { GraphQuery gQuery = (GraphQuery)query; - queryResult = gQuery.evaluate(); + queryResult = txn.evaluate(gQuery); registry = RDFWriterRegistry.getInstance(); view = GraphQueryResultView.getInstance(); } else if (query instanceof BooleanQuery) { BooleanQuery bQuery = (BooleanQuery)query; - queryResult = bQuery.evaluate(); + queryResult = txn.evaluate(bQuery); registry = BooleanQueryResultWriterRegistry.getInstance(); view = BooleanQueryResultView.getInstance(); } @@ -436,14 +396,12 @@ else if (query instanceof BooleanQuery) { "Unsupported query type: " + query.getClass().getName()); } } - catch (QueryInterruptedException e) { + catch (QueryInterruptedException | InterruptedException | ExecutionException e) { logger.info("Query interrupted", e); - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(txnId); - throw new ServerHTTPException(SC_SERVICE_UNAVAILABLE, "Query evaluation took too long"); + throw new ServerHTTPException(SC_SERVICE_UNAVAILABLE, "Query execution interrupted"); } catch (QueryEvaluationException e) { logger.info("Query evaluation error", e); - ActiveTransactionRegistry.INSTANCE.returnTransactionConnection(txnId); if (e.getCause() != null && e.getCause() instanceof HTTPException) { // custom signal from the backend, throw as HTTPException // directly (see SES-1016). @@ -461,13 +419,13 @@ else if (query instanceof BooleanQuery) { model.put(QueryResultView.FACTORY_KEY, factory); model.put(QueryResultView.HEADERS_ONLY, false); // TODO needed for HEAD // requests. - model.put(QueryResultView.TRANSACTION_ID_KEY, txnId); + model.put(QueryResultView.TRANSACTION_ID_KEY, txn.getID()); return new ModelAndView(view, model); } - private Query getQuery(RepositoryConnection repositoryCon, String queryStr, HttpServletRequest request, + private Query getQuery(Transaction txn, String queryStr, HttpServletRequest request, HttpServletResponse response) - throws IOException, ClientHTTPException + throws IOException, ClientHTTPException, InterruptedException, ExecutionException { Query result = null; @@ -514,7 +472,7 @@ private Query getQuery(RepositoryConnection repositoryCon, String queryStr, Http try { IRI uri = null; if (!"null".equals(defaultGraphURI)) { - uri = repositoryCon.getValueFactory().createIRI(defaultGraphURI); + uri = SimpleValueFactory.getInstance().createIRI(defaultGraphURI); } dataset.addDefaultGraph(uri); } @@ -530,7 +488,7 @@ private Query getQuery(RepositoryConnection repositoryCon, String queryStr, Http try { IRI uri = null; if (!"null".equals(namedGraphURI)) { - uri = repositoryCon.getValueFactory().createIRI(namedGraphURI); + uri = SimpleValueFactory.getInstance().createIRI(namedGraphURI); } dataset.addNamedGraph(uri); } @@ -543,12 +501,11 @@ private Query getQuery(RepositoryConnection repositoryCon, String queryStr, Http } try { - result = repositoryCon.prepareQuery(queryLn, queryStr, baseURI); - + result = txn.prepareQuery(queryLn, queryStr, baseURI); result.setIncludeInferred(includeInferred); if (maxQueryTime > 0) { - result.setMaxQueryTime(maxQueryTime); + result.setMaxExecutionTime(maxQueryTime); } if (dataset != null) { @@ -567,7 +524,7 @@ private Query getQuery(RepositoryConnection repositoryCon, String queryStr, Http { String bindingName = parameterName.substring(BINDING_PREFIX.length()); Value bindingValue = ProtocolUtil.parseValueParam(request, parameterName, - repositoryCon.getValueFactory()); + SimpleValueFactory.getInstance()); result.setBinding(bindingName, bindingValue); } } @@ -588,7 +545,7 @@ private Query getQuery(RepositoryConnection repositoryCon, String queryStr, Http return result; } - private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServletRequest request, + private ModelAndView getSparqlUpdateResult(Transaction transaction, HttpServletRequest request, HttpServletResponse response) throws ServerHTTPException, ClientHTTPException, HTTPException { @@ -644,7 +601,7 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle try { IRI uri = null; if (!"null".equals(graphURI)) { - uri = conn.getValueFactory().createIRI(graphURI); + uri = SimpleValueFactory.getInstance().createIRI(graphURI); } dataset.addDefaultRemoveGraph(uri); } @@ -660,7 +617,7 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle try { IRI uri = null; if (!"null".equals(graphURI)) { - uri = conn.getValueFactory().createIRI(graphURI); + uri = SimpleValueFactory.getInstance().createIRI(graphURI); } dataset.setDefaultInsertGraph(uri); } @@ -675,7 +632,7 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle try { IRI uri = null; if (!"null".equals(defaultGraphURI)) { - uri = conn.getValueFactory().createIRI(defaultGraphURI); + uri = SimpleValueFactory.getInstance().createIRI(defaultGraphURI); } dataset.addDefaultGraph(uri); } @@ -691,7 +648,7 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle try { IRI uri = null; if (!"null".equals(namedGraphURI)) { - uri = conn.getValueFactory().createIRI(namedGraphURI); + uri = SimpleValueFactory.getInstance().createIRI(namedGraphURI); } dataset.addNamedGraph(uri); } @@ -703,18 +660,11 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle } try { - Update update = conn.prepareUpdate(queryLn, sparqlUpdateString, baseURI); - - update.setIncludeInferred(includeInferred); - - if (dataset != null) { - update.setDataset(dataset); - } - // determine if any variable bindings have been set on this update. @SuppressWarnings("unchecked") Enumeration parameterNames = request.getParameterNames(); + Map bindings = new HashMap<>(); while (parameterNames.hasMoreElements()) { String parameterName = parameterNames.nextElement(); @@ -723,16 +673,17 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle { String bindingName = parameterName.substring(BINDING_PREFIX.length()); Value bindingValue = ProtocolUtil.parseValueParam(request, parameterName, - conn.getValueFactory()); - update.setBinding(bindingName, bindingValue); + SimpleValueFactory.getInstance()); + bindings.put(bindingName, bindingValue); } } - update.execute(); + transaction.executeUpdate(queryLn, sparqlUpdateString, baseURI, includeInferred, dataset, + bindings); return new ModelAndView(EmptySuccessView.getInstance()); } - catch (UpdateExecutionException e) { + catch (UpdateExecutionException | InterruptedException | ExecutionException e) { if (e.getCause() != null && e.getCause() instanceof HTTPException) { // custom signal from the backend, throw as HTTPException directly // (see SES-1016). @@ -758,49 +709,4 @@ private ModelAndView getSparqlUpdateResult(RepositoryConnection conn, HttpServle } } - private static class WildcardRDFRemover extends AbstractRDFHandler { - - private final RepositoryConnection conn; - - public WildcardRDFRemover(RepositoryConnection conn) { - super(); - this.conn = conn; - } - - @Override - public void handleStatement(Statement st) - throws RDFHandlerException - { - Resource subject = SESAME.WILDCARD.equals(st.getSubject()) ? null : st.getSubject(); - IRI predicate = SESAME.WILDCARD.equals(st.getPredicate()) ? null : st.getPredicate(); - Value object = SESAME.WILDCARD.equals(st.getObject()) ? null : st.getObject(); - - // use the RepositoryConnection.clear operation if we're removing all statements - final boolean clearAllTriples = subject == null && predicate == null && object == null; - - try { - Resource context = st.getContext(); - if (context != null) { - if (clearAllTriples) { - conn.clear(context); - } - else { - conn.remove(subject, predicate, object, context); - } - } - else { - if (clearAllTriples) { - conn.clear(); - } - else { - conn.remove(subject, predicate, object); - } - } - } - catch (RepositoryException e) { - throw new RDFHandlerException(e); - } - } - - } } diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionExportStatementsView.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionExportStatementsView.java new file mode 100644 index 00000000000..b3d0ef8a010 --- /dev/null +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionExportStatementsView.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + *******************************************************************************/ +package org.eclipse.rdf4j.http.server.repository.transaction; + +import static javax.servlet.http.HttpServletResponse.SC_OK; + +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.rdf4j.http.server.ServerHTTPException; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.RDFWriterFactory; +import org.springframework.web.servlet.View; + +/** + * View used to export statements as part of a transaction. Renders the statements as RDF using a + * serialization specified using a parameter or Accept header. + * + * @author Herko ter Horst + * @author Jeen Broekstra + */ +public class TransactionExportStatementsView implements View { + + public static final String SUBJECT_KEY = "subject"; + + public static final String PREDICATE_KEY = "predicate"; + + public static final String OBJECT_KEY = "object"; + + public static final String CONTEXTS_KEY = "contexts"; + + public static final String USE_INFERENCING_KEY = "useInferencing"; + + public static final String TRANSACTION_KEY = "transaction"; + + public static final String FACTORY_KEY = "factory"; + + public static final String HEADERS_ONLY = "headersOnly"; + + private static final TransactionExportStatementsView INSTANCE = new TransactionExportStatementsView(); + + public static TransactionExportStatementsView getInstance() { + return INSTANCE; + } + + private TransactionExportStatementsView() { + } + + public String getContentType() { + return null; + } + + @SuppressWarnings("rawtypes") + public void render(Map model, HttpServletRequest request, HttpServletResponse response) + throws Exception + { + Resource subj = (Resource)model.get(SUBJECT_KEY); + IRI pred = (IRI)model.get(PREDICATE_KEY); + Value obj = (Value)model.get(OBJECT_KEY); + Resource[] contexts = (Resource[])model.get(CONTEXTS_KEY); + boolean useInferencing = (Boolean)model.get(USE_INFERENCING_KEY); + Transaction transaction = (Transaction)model.get(TRANSACTION_KEY); + + boolean headersOnly = (Boolean)model.get(HEADERS_ONLY); + + RDFWriterFactory rdfWriterFactory = (RDFWriterFactory)model.get(FACTORY_KEY); + + RDFFormat rdfFormat = rdfWriterFactory.getRDFFormat(); + + try { + try (OutputStream out = response.getOutputStream()) { + RDFWriter rdfWriter = rdfWriterFactory.getWriter(out); + + response.setStatus(SC_OK); + + String mimeType = rdfFormat.getDefaultMIMEType(); + if (rdfFormat.hasCharset()) { + Charset charset = rdfFormat.getCharset(); + mimeType += "; charset=" + charset.name(); + } + response.setContentType(mimeType); + + String filename = "statements"; + if (rdfFormat.getDefaultFileExtension() != null) { + filename += "." + rdfFormat.getDefaultFileExtension(); + } + response.setHeader("Content-Disposition", "attachment; filename=" + filename); + + if (!headersOnly) { + transaction.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts); + } + } + } + catch (RDFHandlerException e) { + throw new ServerHTTPException("Serialization error: " + e.getMessage(), e); + } + catch (ExecutionException | InterruptedException e) { + throw new ServerHTTPException("Repository error: " + e.getMessage(), e); + } + } + +} diff --git a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionStartController.java b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionStartController.java index caf07638081..0280810502f 100644 --- a/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionStartController.java +++ b/core/http/server-spring/src/main/java/org/eclipse/rdf4j/http/server/repository/transaction/TransactionStartController.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -98,16 +99,13 @@ private ModelAndView startTransaction(Repository repository, HttpServletRequest } try { - RepositoryConnection conn = repository.getConnection(); + Transaction txn = new Transaction(repository); + txn.begin(isolationLevel); - ParserConfig config = conn.getParserConfig(); - config.set(BasicParserSettings.PRESERVE_BNODE_IDS, true); - config.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES); - config.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS); - conn.begin(isolationLevel); - UUID txnId = UUID.randomUUID(); - - ActiveTransactionRegistry.INSTANCE.register(txnId, conn); + UUID txnId = txn.getID(); + + ActiveTransactionRegistry.INSTANCE.register(txn); + model.put(SimpleResponseView.SC_KEY, SC_CREATED); final StringBuffer txnURL = request.getRequestURL(); txnURL.append("/" + txnId.toString()); @@ -116,7 +114,7 @@ private ModelAndView startTransaction(Repository repository, HttpServletRequest model.put(SimpleResponseView.CUSTOM_HEADERS_KEY, customHeaders); return new ModelAndView(SimpleResponseView.getInstance(), model); } - catch (RepositoryException e) { + catch (RepositoryException | InterruptedException | ExecutionException e) { throw new ServerHTTPException("Transaction start error: " + e.getMessage(), e); } } diff --git a/core/http/server-spring/src/test/java/org/eclipse/rdf4j/http/server/repository/transaction/TestActiveTransactionRegistry.java b/core/http/server-spring/src/test/java/org/eclipse/rdf4j/http/server/repository/transaction/TestActiveTransactionRegistry.java index 8d29d960bf8..6d9bd6a9ad0 100644 --- a/core/http/server-spring/src/test/java/org/eclipse/rdf4j/http/server/repository/transaction/TestActiveTransactionRegistry.java +++ b/core/http/server-spring/src/test/java/org/eclipse/rdf4j/http/server/repository/transaction/TestActiveTransactionRegistry.java @@ -12,6 +12,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; +import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.RepositoryException; import org.junit.Before; @@ -26,7 +27,7 @@ public class TestActiveTransactionRegistry { private ActiveTransactionRegistry registry; - private RepositoryConnection conn; + private Repository repository; private UUID txnId1; @@ -41,120 +42,8 @@ public void setUp() { System.setProperty(ActiveTransactionRegistry.CACHE_TIMEOUT_PROPERTY, "1"); registry = ActiveTransactionRegistry.INSTANCE; - conn = Mockito.mock(RepositoryConnection.class); - txnId1 = UUID.randomUUID(); - txnId2 = UUID.randomUUID(); - - } - - @Test - public void testTimeoutRepeatedAccess() - throws Exception - { - registry.register(txnId2, conn); - - int count = 0; - while (count++ < 2) { - logger.debug("pass {}", count); - registry.getTransactionConnection(txnId2); - Thread.sleep(1200); - registry.returnTransactionConnection(txnId2); - } - - registry.deregister(txnId2); - try { - registry.getTransactionConnection(txnId2); - fail("should be deregistered"); - } - catch (RepositoryException e) { - // fall through, expected - } + repository = Mockito.mock(Repository.class); } - @Test - public void testMultithreadedAccess() { - - CountDownLatch txn1registered = new CountDownLatch(1); - - CountDownLatch done = new CountDownLatch(2); - - Runnable r1 = new Runnable() { - - @Override - public void run() { - registry.register(txnId1, conn); - txn1registered.countDown(); - - try { - registry.getTransactionConnection(txnId1); - Thread.sleep(700); - registry.returnTransactionConnection(txnId1); - - done.countDown(); - } - catch (RepositoryException | InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; - - Runnable r2 = new Runnable() { - - @Override - public void run() { - try { - txn1registered.await(); - } - catch (InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try { - registry.getTransactionConnection(txnId1); - Thread.sleep(500); - registry.returnTransactionConnection(txnId1); - - done.countDown(); - } - catch (RepositoryException | InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; - - Runnable r3 = new Runnable() { - - @Override - public void run() { - try { - done.await(); - registry.deregister(txnId1); - } - catch (InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; - - Thread t1 = new Thread(r1, "r1"); - Thread t2 = new Thread(r2, "r2"); - Thread t3 = new Thread(r3, "r3"); - - t3.start(); - t2.start(); - t1.start(); - - try { - t3.join(); - } - catch (InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } }