diff --git a/modeshape-jca-rar/src/main/resources/META-INF/ra.xml b/modeshape-jca-rar/src/main/resources/META-INF/ra.xml index a3b561026a..5dd04c8810 100644 --- a/modeshape-jca-rar/src/main/resources/META-INF/ra.xml +++ b/modeshape-jca-rar/src/main/resources/META-INF/ra.xml @@ -49,7 +49,7 @@ javax.jcr.Session org.modeshape.jca.JcrSessionHandle - XATransaction + LocalTransaction false diff --git a/modeshape-jca/src/main/java/org/modeshape/jca/JcrLocalTransaction.java b/modeshape-jca/src/main/java/org/modeshape/jca/JcrLocalTransaction.java new file mode 100644 index 0000000000..49e0e3b387 --- /dev/null +++ b/modeshape-jca/src/main/java/org/modeshape/jca/JcrLocalTransaction.java @@ -0,0 +1,77 @@ +/* + * ModeShape (http://www.modeshape.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.modeshape.jca; + +import javax.resource.ResourceException; +import javax.resource.spi.LocalTransaction; +import javax.resource.spi.LocalTransactionException; +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.SystemException; +import org.modeshape.common.annotation.Immutable; +import org.modeshape.jcr.txn.Transactions; + +/** + * ModeShape's JCA implementation of a {@link javax.resource.spi.LocalTransaction}. + * + * @author Horia Chiorean (hchiorea@redhat.com) + */ +@Immutable +public final class JcrLocalTransaction implements LocalTransaction { + + private final Transactions transactions; + + protected JcrLocalTransaction(Transactions transactions) { + this.transactions = transactions; + } + + @Override + public void begin() throws ResourceException { + try { + transactions.begin(); + } catch (NotSupportedException | SystemException e) { + throw new LocalTransactionException(e); + } + } + + @Override + public void commit() throws ResourceException { + try { + final Transactions.Transaction transaction = transactions.currentModeShapeTransaction(); + if (transaction == null) { + throw new LocalTransactionException("A local transaction does not exist"); + } + transaction.commit(); + } catch (RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException e) { + throw new LocalTransactionException(e); + } + } + + @Override + public void rollback() throws ResourceException { + final Transactions.Transaction transaction = transactions.currentModeShapeTransaction(); + if (transaction == null) { + throw new LocalTransactionException("A local transaction does not exist"); + } + try { + transaction.rollback(); + } catch (SystemException e) { + throw new LocalTransactionException(e); + } + } +} diff --git a/modeshape-jca/src/main/java/org/modeshape/jca/JcrManagedConnection.java b/modeshape-jca/src/main/java/org/modeshape/jca/JcrManagedConnection.java index 6798a53b3d..8b0c0aee9d 100644 --- a/modeshape-jca/src/main/java/org/modeshape/jca/JcrManagedConnection.java +++ b/modeshape-jca/src/main/java/org/modeshape/jca/JcrManagedConnection.java @@ -30,8 +30,9 @@ import javax.resource.spi.ManagedConnectionMetaData; import javax.security.auth.Subject; import javax.transaction.xa.XAResource; - import org.modeshape.common.logging.Logger; +import org.modeshape.jcr.JcrSession; +import org.modeshape.jcr.txn.Transactions; /** @@ -56,7 +57,8 @@ public class JcrManagedConnection implements ManagedConnection { private final List listeners = new CopyOnWriteArrayList(); private final JcrConnectionRequestInfo cri; - private Session session; + private JcrSession session; + private Transactions transactions; // Handles. private final List handles = new CopyOnWriteArrayList(); @@ -76,6 +78,7 @@ public JcrManagedConnection( JcrManagedConnectionFactory mcf, // init repository and open session this.session = openSession(); + this.transactions = session.getRepository().transactions(); } /** @@ -120,11 +123,11 @@ private void removeHandle( JcrSessionHandle handle ) { * @return new JCR session handle object. * @throws ResourceException if there is an error opening the session */ - private Session openSession() throws ResourceException { + private JcrSession openSession() throws ResourceException { try { Repository repo = mcf.getRepository(); Session s = repo.login(cri.getCredentials(), cri.getWorkspace()); - return s; + return (JcrSession) s; } catch (RepositoryException e) { throw new ResourceException("Failed to create session: " + e.getMessage(), e); } @@ -172,6 +175,7 @@ public void associateConnection( Object connection ) throws ResourceException { public void cleanup() throws ResourceException { this.session.logout(); this.session = openSession(); + this.transactions = session.getRepository().transactions(); this.handles.clear(); } @@ -229,7 +233,7 @@ protected void closeHandle( JcrSessionHandle handle ) { /** * Gets the log writer for this ManagedConnection instance. * - * @return Character ourput stream associated with this Managed-Connection instance + * @return Character output stream associated with this Managed-Connection instance * @throws ResourceException generic exception if operation fails */ @Override @@ -256,7 +260,7 @@ public void setLogWriter( PrintWriter out ) throws ResourceException { */ @Override public LocalTransaction getLocalTransaction() throws ResourceException { - return null; + return new JcrLocalTransaction(transactions); } /** @@ -267,7 +271,7 @@ public LocalTransaction getLocalTransaction() throws ResourceException { */ @Override public XAResource getXAResource() throws ResourceException { - return (XAResource)session; + throw new UnsupportedOperationException("ModeShape 5 does not support XA"); } /** diff --git a/modeshape-jca/src/test/resources/ironjacamar.xml b/modeshape-jca/src/test/resources/ironjacamar.xml index 3ae23ed827..5930d28650 100644 --- a/modeshape-jca/src/test/resources/ironjacamar.xml +++ b/modeshape-jca/src/test/resources/ironjacamar.xml @@ -27,7 +27,7 @@ xsi:schemaLocation="http://www.jboss.org/ironjacamar/schema http://www.jboss.org/ironjacamar/schema/ironjacamar_1_0.xsd"> - XATransaction + LocalTransaction diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java index d20784d88a..195d76f7b1 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java @@ -103,12 +103,10 @@ import org.modeshape.jcr.cache.NodeCache; import org.modeshape.jcr.cache.NodeKey; import org.modeshape.jcr.cache.RepositoryCache; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; import org.modeshape.jcr.cache.WorkspaceNotFoundException; import org.modeshape.jcr.cache.document.DocumentStore; import org.modeshape.jcr.cache.document.LocalDocumentStore; -import org.modeshape.jcr.cache.document.TransactionalWorkspaceCaches; import org.modeshape.jcr.clustering.ClusteringService; import org.modeshape.jcr.federation.FederatedDocumentStore; import org.modeshape.jcr.journal.ChangeJournal; @@ -420,6 +418,10 @@ protected final boolean doShutdown(boolean rollback) { return true; } + public Transactions transactions() { + return runningState().transactions; + } + protected final IndexManager getIndexManager() { return runningState().queryManager().getIndexManager(); } @@ -685,13 +687,8 @@ public synchronized JcrSession login( final Credentials credentials, SecurityContext securityContext = sessionContext.getSecurityContext(); boolean writable = JcrSession.hasRole(securityContext, ModeShapeRoles.READWRITE, repoName, workspaceName) || JcrSession.hasRole(securityContext, ModeShapeRoles.ADMIN, repoName, workspaceName); - JcrSession session = null; - if (running.useXaSessions()) { - session = new JcrXaSession(this, workspaceName, sessionContext, attributes, !writable); - } else { - session = new JcrSession(this, workspaceName, sessionContext, attributes, !writable); - } - + JcrSession session = new JcrSession(this, workspaceName, sessionContext, attributes, !writable); + // Need to make sure that the user has access to this session session.checkWorkspacePermission(workspaceName, ModeShapePermissions.READ); running.addSession(session, false); @@ -931,8 +928,8 @@ protected class RunningState { private final ExecutionContext context; private final ExecutionContext internalWorkerContext; private final ReadWriteLock activeSessionLock = new ReentrantReadWriteLock(); - private final WeakHashMap activeSessions = new WeakHashMap(); - private final WeakHashMap internalSessions = new WeakHashMap(); + private final WeakHashMap activeSessions = new WeakHashMap<>(); + private final WeakHashMap internalSessions = new WeakHashMap<>(); private final RepositoryStatistics statistics; private final RepositoryStatisticsBean mbean; private final BinaryStore binaryStore; @@ -944,7 +941,6 @@ protected class RunningState { private final TextExtractors extractors; private final ChangeBus changeBus; private final ExecutorService changeDispatchingQueue; - private final boolean useXaSessions; private final MimeTypeDetector mimeTypeDetector; private final BackupService backupService; private final InitialContentImporter initialContentImporter; @@ -1164,8 +1160,6 @@ protected RunningState( JcrRepository.RunningState other, this.statistics.set(ValueMetric.WORKSPACE_COUNT, cache.getWorkspaceNames().size()); } - this.useXaSessions = this.transactions instanceof SynchronizedTransactions; - if (other != null && !change.securityChanged) { this.authenticators = other.authenticators; this.anonymousCredentialsIfSuppliedCredentialsFail = other.anonymousCredentialsIfSuppliedCredentialsFail; @@ -1383,10 +1377,6 @@ protected final Sequencers sequencers() { return sequencers; } - protected final boolean useXaSessions() { - return useXaSessions; - } - final String name() { return repositoryName(); } @@ -1878,14 +1868,12 @@ void resumeExistingUserTransaction() throws SystemException { } protected class JcrRepositoryEnvironment implements RepositoryEnvironment { - private final TransactionalWorkspaceCaches transactionalWorkspaceCacheFactory; private final Transactions transactions; private final String journalId; protected JcrRepositoryEnvironment( Transactions transactions, String journalId ) { this.transactions = transactions; - this.transactionalWorkspaceCacheFactory = new TransactionalWorkspaceCaches(transactions); this.journalId = journalId; } @@ -1894,11 +1882,6 @@ public Transactions getTransactions() { return transactions; } - @Override - public TransactionalWorkspaceCaches getTransactionalWorkspaceCacheFactory() { - return transactionalWorkspaceCacheFactory; - } - @Override public String journalId() { return journalId; diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrXaSession.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrXaSession.java deleted file mode 100644 index 6478dd9725..0000000000 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/JcrXaSession.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ModeShape (http://www.modeshape.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.modeshape.jcr; - -import java.util.Map; -import javax.transaction.Synchronization; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import org.infinispan.AdvancedCache; - -/** - * An extension of {@link JcrSession} that is also an {@link XAResource}, enabling clients to explicitly enlist the session in the - * transaction. - *

- * This implementation simply delegates to the {@link AdvancedCache#getXAResource() Infinispan cache's XAResource} instance, since - * ModeShape does not need to itself be a resource that participates in the transaction. (Instead, ModeShape registers as a - * {@link Synchronization} on the transaction, ensuring that it is notified when the transaction completes.) - *

- */ -public class JcrXaSession extends JcrSession implements XAResource { - - protected JcrXaSession( JcrRepository repository, - String workspaceName, - ExecutionContext context, - Map sessionAttributes, - boolean readOnly ) { - super(repository, workspaceName, context, sessionAttributes, readOnly); - } - - protected final XAResource delegate() { - return repository.documentStore().xaResource(); - } - - @Override - public void start( Xid xid, - int flags ) throws XAException { - delegate().start(xid, flags); - } - - @Override - public void end( Xid xid, - int flags ) throws XAException { - delegate().end(xid, flags); - } - - @Override - public int prepare( Xid xid ) throws XAException { - return delegate().prepare(xid); - } - - @Override - public void commit( Xid xid, - boolean onePhase ) throws XAException { - delegate().commit(xid, onePhase); - } - - @Override - public void rollback( Xid xid ) throws XAException { - delegate().rollback(xid); - } - - @Override - public void forget( Xid xid ) throws XAException { - delegate().forget(xid); - } - - @Override - public Xid[] recover( int flag ) throws XAException { - return delegate().recover(flag); - } - - @Override - public boolean isSameRM( XAResource xaRes ) throws XAException { - return delegate().isSameRM(xaRes); - } - - @Override - public int getTransactionTimeout() throws XAException { - return delegate().getTransactionTimeout(); - } - - @Override - public boolean setTransactionTimeout( int seconds ) throws XAException { - return delegate().setTransactionTimeout(seconds); - } - -} diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryEnvironment.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/RepositoryEnvironment.java similarity index 75% rename from modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryEnvironment.java rename to modeshape-jcr/src/main/java/org/modeshape/jcr/RepositoryEnvironment.java index d5b368c5bf..7e2032326f 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryEnvironment.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/RepositoryEnvironment.java @@ -13,10 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.modeshape.jcr.cache; +package org.modeshape.jcr; -import org.modeshape.jcr.NodeTypes; -import org.modeshape.jcr.cache.document.TransactionalWorkspaceCaches; import org.modeshape.jcr.txn.Transactions; /** @@ -25,19 +23,12 @@ public interface RepositoryEnvironment { /** - * Get the interface for working with transactions. + * Returns the repository's {@link Transactions} instance * - * @return the transactions object + * @return a {@link Transactions} instance, never {@code null} */ Transactions getTransactions(); - /** - * Get the factory used to obtain the transactional workspace caches. - * - * @return the factory; never null - */ - TransactionalWorkspaceCaches getTransactionalWorkspaceCacheFactory(); - /** * Returns the id of the repository's {@link org.modeshape.jcr.journal.ChangeJournal} * diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryCache.java index 288e6c2739..b2d5c12993 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/RepositoryCache.java @@ -49,6 +49,7 @@ import org.modeshape.jcr.ModeShape; import org.modeshape.jcr.ModeShapeLexicon; import org.modeshape.jcr.RepositoryConfiguration; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.Upgrades; import org.modeshape.jcr.api.value.DateTime; import org.modeshape.jcr.bus.ChangeBus; @@ -66,6 +67,7 @@ import org.modeshape.jcr.cache.document.LocalDocumentStore.DocumentOperation; import org.modeshape.jcr.cache.document.LocalDocumentStore.DocumentOperationResults; import org.modeshape.jcr.cache.document.ReadOnlySessionCache; +import org.modeshape.jcr.cache.document.TransactionalWorkspaceCaches; import org.modeshape.jcr.cache.document.WorkspaceCache; import org.modeshape.jcr.cache.document.WritableSessionCache; import org.modeshape.jcr.clustering.ClusteringService; @@ -118,6 +120,7 @@ public class RepositoryCache { protected final String systemWorkspaceName; protected final Logger logger; private final RepositoryEnvironment repositoryEnvironment; + private final TransactionalWorkspaceCaches txWorkspaceCaches; private final String processKey; protected final Upgrades upgrades; private volatile boolean initializingRepository = false; @@ -144,6 +147,7 @@ public RepositoryCache( ExecutionContext context, this.minimumStringLengthForBinaryStorage.set(configuration.getBinaryStorage().getMinimumStringSize()); this.translator = new DocumentTranslator(this.context, this.documentStore, this.minimumStringLengthForBinaryStorage.get()); this.repositoryEnvironment = repositoryEnvironment; + this.txWorkspaceCaches = new TransactionalWorkspaceCaches(repositoryEnvironment.getTransactions()); this.processKey = context.getProcessId(); this.logger = Logger.getLogger(getClass()); this.rootNodeId = RepositoryConfiguration.ROOT_NODE_ID; @@ -188,7 +192,7 @@ public RepositoryCache( ExecutionContext context, doc.setNumber(REPOSITORY_UPGRADE_ID_FIELD_NAME, upgrades.getLatestAvailableUpgradeId()); // store the repository info - if (this.documentStore.localStore().putIfAbsent(REPOSITORY_INFO_KEY, doc) != null) { + if (this.documentStore.storeDocument(REPOSITORY_INFO_KEY, doc) != null) { // if clustered, we should be holding a cluster-wide lock, so if some other process managed to write under this // key, // smth is seriously wrong. If not clustered, only 1 thread will always perform repository initialization. @@ -613,10 +617,10 @@ protected void refreshRepositoryMetadata( boolean update ) { // We need to create a new entry ... EditableDocument newDoc = Schematic.newDocument(); translator.setKey(newDoc, systemMetadataKey); - systemEntry = documentStore().localStore().putIfAbsent(systemMetadataKeyStr, newDoc); + systemEntry = documentStore.storeDocument(systemMetadataKeyStr, newDoc); if (systemEntry == null) { // Read-read the entry that we just put, so we can populate it with the same code that edits it ... - systemEntry = documentStore().localStore().get(systemMetadataKeyStr); + systemEntry = documentStore.get(systemMetadataKeyStr); } } EditableDocument doc = documentStore().localStore().edit(systemMetadataKeyStr, true, false); @@ -843,12 +847,11 @@ WorkspaceCache workspace( final String name ) { documentStore, translator, rootKey, nodeCache, changeBus, repositoryEnvironment()); - if (documentStore.localStore().putIfAbsent(rootKey.toString(), rootDoc) == null) { + if (documentStore.storeDocument(rootKey.toString(), rootDoc) == null) { // we are the first node to perform the initialization, so we need to link the system node if (!RepositoryCache.this.systemWorkspaceName.equals(name)) { logger.debug("Creating '{0}' workspace in repository '{1}'", name, getName()); - SessionCache workspaceSession = new WritableSessionCache(context, result, - repositoryEnvironment); + SessionCache workspaceSession = new WritableSessionCache(context, result, txWorkspaceCaches, repositoryEnvironment); MutableCachedNode workspaceRootNode = workspaceSession.mutable(workspaceSession.getRootKey()); workspaceRootNode.linkChild(workspaceSession, RepositoryCache.this.systemKey, JcrLexicon.SYSTEM); @@ -878,7 +881,7 @@ void removeWorkspaceCaches( String name ) { WorkspaceCache removed = this.workspaceCachesByName.remove(name); if (removed != null) { removed.signalDeleted(); - repositoryEnvironment.getTransactionalWorkspaceCacheFactory().remove(name); + txWorkspaceCaches.rollbackActiveTransactionsForWorkspace(name); } } @@ -1074,9 +1077,9 @@ public SessionCache createSession(ExecutionContext context, boolean readOnly) { WorkspaceCache workspaceCache = workspace(workspaceName); if (readOnly) { - return new ReadOnlySessionCache(context, workspaceCache, repositoryEnvironment); + return new ReadOnlySessionCache(context, workspaceCache); } - return new WritableSessionCache(context, workspaceCache, repositoryEnvironment); + return new WritableSessionCache(context, workspaceCache, txWorkspaceCaches, repositoryEnvironment); } /** diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/AbstractSessionCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/AbstractSessionCache.java index 238bddca8f..2e6ac8de5c 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/AbstractSessionCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/AbstractSessionCache.java @@ -20,24 +20,15 @@ import java.util.Iterator; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.SystemException; -import javax.transaction.Transaction; import org.modeshape.common.annotation.Immutable; import org.modeshape.common.logging.Logger; import org.modeshape.jcr.ExecutionContext; -import org.modeshape.jcr.JcrI18n; import org.modeshape.jcr.api.value.DateTime; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.ChildReference; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; -import org.modeshape.jcr.txn.Transactions; -import org.modeshape.jcr.value.BinaryKey; import org.modeshape.jcr.value.NameFactory; import org.modeshape.jcr.value.Path; import org.modeshape.jcr.value.PathFactory; @@ -70,18 +61,14 @@ public String getUserId() { } private final WorkspaceCache sharedWorkspaceCache; - private final AtomicReference workspaceCache = new AtomicReference(); + private final AtomicReference workspaceCache = new AtomicReference<>(); private final NameFactory nameFactory; private final PathFactory pathFactory; private final Path rootPath; - private final RepositoryEnvironment repositoryEnvironment; - private final ConcurrentHashMap completeTxFunctionByTxId = new ConcurrentHashMap<>(); - private ExecutionContext context; - protected AbstractSessionCache( ExecutionContext context, - WorkspaceCache sharedWorkspaceCache, - RepositoryEnvironment repositoryEnvironment ) { + protected AbstractSessionCache(ExecutionContext context, + WorkspaceCache sharedWorkspaceCache) { this.context = context; this.sharedWorkspaceCache = sharedWorkspaceCache; this.workspaceCache.set(sharedWorkspaceCache); @@ -89,74 +76,10 @@ protected AbstractSessionCache( ExecutionContext context, this.nameFactory = factories.getNameFactory(); this.pathFactory = factories.getPathFactory(); this.rootPath = this.pathFactory.createRootPath(); - this.repositoryEnvironment = repositoryEnvironment; - assert this.repositoryEnvironment != null; - checkForTransaction(); } protected abstract Logger logger(); - /** - * Signal that this session cache should check for an existing transaction and use the appropriate workspace cache. If there - * is a (new to this session) transaction, then this session will use a transaction-specific workspace cache (shared by other - * sessions participating in the same transaction), and upon completion of the transaction the session will switch back to the - * shared workspace cache. - */ - @Override - public void checkForTransaction() { - try { - Transactions transactions = repositoryEnvironment.getTransactions(); - Transaction txn = transactions.getTransactionManager().getTransaction(); - if (txn != null && txn.getStatus() == Status.STATUS_ACTIVE) { - // There is an active transaction, so we need a transaction-specific workspace cache ... - workspaceCache.set(repositoryEnvironment.getTransactionalWorkspaceCacheFactory() - .getTransactionalWorkspaceCache(sharedWorkspaceCache)); - // only register the function if there's an active ModeShape transaction because we need to run the - // function *only after* ISPN has committed its transaction & updated the cache - // if there isn't an active ModeShape transaction, one will become active later during "save" - // otherwise, "save" is never called meaning this cache should be discarded - Transactions.Transaction modeshapeTx = transactions.currentTransaction(); - if (modeshapeTx != null) { - // we can use the identity hash code as a tx id, because we essentially want a different tx function for each - // different transaction and as long as a tx is active, it should not be garbage collected, hence we should - // get different IDs for different transactions - final int txId = System.identityHashCode(modeshapeTx); - if (!completeTxFunctionByTxId.containsKey(txId)) { - // create and register the complete transaction function only once - Transactions.TransactionFunction completeFunction = new Transactions.TransactionFunction() { - @Override - public void execute() { - completeTransaction(txId); - } - }; - if (completeTxFunctionByTxId.putIfAbsent(txId, completeFunction) == null) { - // we only want 1 completion function per tx id - modeshapeTx.uponCompletion(completeFunction); - } - } - } - } else { - // There is no active transaction, so just use the shared workspace cache ... - completeTransaction(null); - } - } catch (SystemException e) { - logger().error(e, JcrI18n.errorDeterminingCurrentTransactionAssumingNone, workspaceName(), e.getMessage()); - } catch (RollbackException e) { - logger().error(e, JcrI18n.errorDeterminingCurrentTransactionAssumingNone, workspaceName(), e.getMessage()); - } - } - - /** - * Signal that the transaction that was active and in which this session participated has completed and that this session - * should no longer use a transaction-specific workspace cache. - */ - protected void completeTransaction(final Integer txId) { - workspaceCache.set(sharedWorkspaceCache); - if (txId != null) { - completeTxFunctionByTxId.remove(txId); - } - } - @Override public final SessionCache unwrap() { return this; @@ -195,11 +118,16 @@ final PathFactory pathFactory() { final Path rootPath() { return rootPath; } - - RepositoryEnvironment sessionContext() { - return repositoryEnvironment; + + final AbstractSessionCache setWorkspaceCache(WorkspaceCache cache) { + this.workspaceCache.set(cache); + return this; } - + + final WorkspaceCache sharedWorkspaceCache() { + return sharedWorkspaceCache; + } + @Override public final void addContextData( String key, String value ) { @@ -306,13 +234,4 @@ public final void clear() { protected abstract void doClear( CachedNode node ); protected abstract void doClear(); - - /** - * Register the fact that one or more binary values are being used or not used anymore by a node. - * - * @param nodeKey a {@link NodeKey} instance; may not be null. - * @param binaryKeys an array of {@link BinaryKey} instances; may not be null. - */ - protected abstract void addBinaryReference( NodeKey nodeKey, BinaryKey... binaryKeys ); - } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentStore.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentStore.java index 0cf7d898d4..5dad6b3f69 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentStore.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentStore.java @@ -18,7 +18,6 @@ import java.util.Collection; import javax.transaction.TransactionManager; -import javax.transaction.xa.XAResource; import org.infinispan.schematic.SchematicEntry; import org.infinispan.schematic.document.Document; import org.infinispan.schematic.document.EditableDocument; @@ -155,13 +154,6 @@ public EditableDocument edit( String key, */ public TransactionManager transactionManager(); - /** - * Returns a resource used in distributed transactions - * - * @return an {@link XAResource instance} or {@code null} - */ - public XAResource xaResource(); - /** * Returns a local store instance which will use the local Infinispan cache to store/retrieve information. * diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentTranslator.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentTranslator.java index b94ee5d7e9..11d480b884 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentTranslator.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/DocumentTranslator.java @@ -47,12 +47,12 @@ import org.modeshape.jcr.ExecutionContext; import org.modeshape.jcr.JcrLexicon; import org.modeshape.jcr.NodeTypes; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.api.value.DateTime; import org.modeshape.jcr.cache.CachedNode.ReferenceType; import org.modeshape.jcr.cache.ChildReference; import org.modeshape.jcr.cache.ChildReferences; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.document.SessionNode.ChangedAdditionalParents; import org.modeshape.jcr.cache.document.SessionNode.ChangedChildren; import org.modeshape.jcr.cache.document.SessionNode.Insertions; diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/LocalDocumentStore.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/LocalDocumentStore.java index d24fb05ce7..7256e85562 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/LocalDocumentStore.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/LocalDocumentStore.java @@ -26,7 +26,6 @@ import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.TransactionManager; -import javax.transaction.xa.XAResource; import org.infinispan.Cache; import org.infinispan.distexec.DistributedCallable; import org.infinispan.schematic.Schematic; @@ -74,7 +73,7 @@ public SchematicEntry get( String key ) { @Override public SchematicEntry storeDocument( String key, Document document ) { - return putIfAbsent(key, document); + return database.putIfAbsent(key, document); } @Override @@ -92,19 +91,6 @@ public String newDocumentKey( String parentKey, return null; } - /** - * Store the supplied document and metadata at the given key. - * - * @param key the key or identifier for the document - * @param document the document that is to be stored - * @return the existing entry for the supplied key, or null if there was no entry and the put was successful - * @see SchematicDb#putIfAbsent(String, org.infinispan.schematic.document.Document) - */ - public SchematicEntry putIfAbsent( String key, - Document document ) { - return database.putIfAbsent(key, document); - } - /** * Store the supplied document and metadata at the given key. * @@ -126,18 +112,6 @@ public void put( Document entryDocument ) { database.put(entryDocument); } - /** - * Replace the existing document and metadata at the given key with the document that is supplied. This method does nothing if - * there is not an existing entry at the given key. - * - * @param key the key or identifier for the document - * @param document the new document that is to replace the existing document (or binary content) the replacement - */ - public void replace( String key, - Document document ) { - database.replace(key, document); - } - @Override public boolean remove( String key ) { return database.remove(key) != null; @@ -171,11 +145,6 @@ public TransactionManager transactionManager() { return localCache().getAdvancedCache().getTransactionManager(); } - @Override - public XAResource xaResource() { - return localCache().getAdvancedCache().getXAResource(); - } - @Override public void setLocalSourceKey( String sourceKey ) { this.localSourceKey = sourceKey; diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/ReadOnlySessionCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/ReadOnlySessionCache.java index 142f5e43b9..e9f11eacf7 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/ReadOnlySessionCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/ReadOnlySessionCache.java @@ -22,9 +22,7 @@ import org.modeshape.jcr.ExecutionContext; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; -import org.modeshape.jcr.value.BinaryKey; /** * A read-only {@link SessionCache} implementation. @@ -34,10 +32,9 @@ public class ReadOnlySessionCache extends AbstractSessionCache { private static final Logger LOGGER = Logger.getLogger(ReadOnlySessionCache.class); - public ReadOnlySessionCache( ExecutionContext context, - WorkspaceCache workspaceCache, - RepositoryEnvironment repositoryEnvironment ) { - super(context, workspaceCache, repositoryEnvironment); + public ReadOnlySessionCache(ExecutionContext context, + WorkspaceCache workspaceCache) { + super(context, workspaceCache); } @Override @@ -70,11 +67,6 @@ protected void doClear( CachedNode node ) { // do nothing } - @Override - protected void addBinaryReference( NodeKey nodeKey, BinaryKey... binaryKeys ) { - // do nothing - } - @Override public void save() { // do nothing @@ -120,4 +112,8 @@ public String toString() { return sb.toString(); } + @Override + public void checkForTransaction() { + // do nothing + } } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/SessionNode.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/SessionNode.java index b4d802d6ab..add228d734 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/SessionNode.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/SessionNode.java @@ -47,6 +47,7 @@ import org.modeshape.jcr.JcrSession; import org.modeshape.jcr.ModeShapeLexicon; import org.modeshape.jcr.NodeTypes; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.api.Binary; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.ChildReference; @@ -60,7 +61,6 @@ import org.modeshape.jcr.cache.PathCache; import org.modeshape.jcr.cache.ReferrerCounts; import org.modeshape.jcr.cache.ReferrerCounts.MutableReferrerCounts; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; import org.modeshape.jcr.cache.WrappedException; import org.modeshape.jcr.value.BinaryKey; @@ -947,7 +947,7 @@ private void processPropertyChange( SessionCache cache, // if there are any binary usage changes, send them to the session cache if (!binaryChanges.isEmpty()) { - session(cache).addBinaryReference(key, binaryChanges.toArray(new BinaryKey[binaryChanges.size()])); + writableSession(cache).addBinaryReference(key, binaryChanges.toArray(new BinaryKey[binaryChanges.size()])); } // if an existing reference property was just updated with the same value in the same order, it is a no-op so we should just diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCache.java index c222c3b2b5..60d7966598 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCache.java @@ -17,11 +17,8 @@ import java.util.concurrent.ConcurrentHashMap; import javax.transaction.Transaction; -import org.modeshape.jcr.cache.CachedNode; -import org.modeshape.jcr.cache.NodeKey; import org.modeshape.jcr.cache.RepositoryCache; import org.modeshape.jcr.cache.change.ChangeSet; -import org.modeshape.jcr.cache.document.TransactionalWorkspaceCaches.OnEachTransactionalCache; import org.modeshape.jcr.txn.SynchronizedTransactions; /** @@ -46,17 +43,17 @@ public class TransactionalWorkspaceCache extends WorkspaceCache { private final WorkspaceCache sharedWorkspaceCache; - private final TransactionalWorkspaceCaches cacheManager; + private final TransactionalWorkspaceCaches txWorkspaceCaches; private final Transaction txn; protected TransactionalWorkspaceCache( WorkspaceCache sharedWorkspaceCache, - TransactionalWorkspaceCaches cacheManager, - Transaction txn ) { + TransactionalWorkspaceCaches txWorkspaceCaches, + Transaction txn) { // Use a new in-memory map for the transactional cache ... - super(sharedWorkspaceCache, new ConcurrentHashMap()); + super(sharedWorkspaceCache, new ConcurrentHashMap<>()); this.sharedWorkspaceCache = sharedWorkspaceCache; this.txn = txn; - this.cacheManager = cacheManager; + this.txWorkspaceCaches = txWorkspaceCaches; } @Override @@ -75,29 +72,19 @@ public void changed( ChangeSet changes ) { * @see SynchronizedTransactions#updateCache(WorkspaceCache, ChangeSet, org.modeshape.jcr.txn.Transactions.Transaction) */ public void changedWithinTransaction( final ChangeSet changes ) { - cacheManager.onAllWorkspacesInTransaction(txn, new OnEachTransactionalCache() { - @Override - public void execute( TransactionalWorkspaceCache cache ) { - cache.internalChangedWithinTransaction(changes); - } - }); + txWorkspaceCaches.workspaceCachesFor(txn).forEach(cache -> cache.internalChangedWithinTransaction(changes)); } @Override public void clear() { - cacheManager.onAllWorkspacesInTransaction(txn, new OnEachTransactionalCache() { - @Override - public void execute( TransactionalWorkspaceCache cache ) { - cache.internalClear(); - } - }); + txWorkspaceCaches.workspaceCachesFor(txn).forEach(TransactionalWorkspaceCache::internalClear); } - void internalClear() { + protected void internalClear() { super.clear(); } - void internalChangedWithinTransaction( ChangeSet changes ) { + protected void internalChangedWithinTransaction( ChangeSet changes ) { // Handle it ourselves ... super.changed(changes); } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCaches.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCaches.java index 81528a1167..199b9be67f 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCaches.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/TransactionalWorkspaceCaches.java @@ -15,10 +15,12 @@ */ package org.modeshape.jcr.cache.document; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.stream.Stream; import javax.transaction.RollbackException; import javax.transaction.Status; import javax.transaction.Synchronization; @@ -26,6 +28,7 @@ import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.modeshape.common.logging.Logger; +import org.modeshape.common.util.CheckArg; import org.modeshape.jcr.JcrI18n; import org.modeshape.jcr.txn.Transactions; @@ -33,120 +36,77 @@ * A manager for keeping track of transaction-specific WorkspaceCache instances. */ public class TransactionalWorkspaceCaches { - - private TransactionManager txnMgr; - private Map> transactionalCachesByTransaction = new HashMap>(); + private static final Logger LOGGER = Logger.getLogger(TransactionalWorkspaceCache.class); + + private final TransactionManager txnMgr; + private final Map> transactionalCachesByTransaction = new HashMap<>(); public TransactionalWorkspaceCaches( Transactions transactions ) { - this.txnMgr = transactions != null ? transactions.getTransactionManager() : null; + CheckArg.isNotNull(transactions, "transactions"); + this.txnMgr = transactions.getTransactionManager(); } - - public WorkspaceCache getTransactionalWorkspaceCache( WorkspaceCache sharedWorkspaceCache ) - throws SystemException, RollbackException { - if (txnMgr == null) return sharedWorkspaceCache; - + + protected WorkspaceCache getTransactionalCache(WorkspaceCache globalWorkspaceCache) throws Exception { // Get the current transaction ... Transaction txn = txnMgr.getTransaction(); - if (txn == null || txn.getStatus() != Status.STATUS_ACTIVE) return sharedWorkspaceCache; - - synchronized (this) { - String workspaceName = sharedWorkspaceCache.getWorkspaceName(); - Map workspaceCachesForTransaction = transactionalCachesByTransaction.get(txn); - if (workspaceCachesForTransaction == null) { - // No transactional caches for this transaction yet ... - workspaceCachesForTransaction = new HashMap(); - transactionalCachesByTransaction.put(txn, workspaceCachesForTransaction); - TransactionalWorkspaceCache newCache = createCache(sharedWorkspaceCache, txn); - workspaceCachesForTransaction.put(workspaceName, newCache); - return newCache; - } + if (txn == null || txn.getStatus() != Status.STATUS_ACTIVE) return globalWorkspaceCache; - TransactionalWorkspaceCache cache = workspaceCachesForTransaction.get(workspaceName); - if (cache != null) { - return cache; - } - // No transactional cache for this workspace ... - cache = createCache(sharedWorkspaceCache, txn); - workspaceCachesForTransaction.put(workspaceName, cache); - return cache; - } - } - - public synchronized void remove( String workspaceName ) { - if (txnMgr == null) return; - Set transactions = new HashSet(); synchronized (this) { - for (Map.Entry> entry : transactionalCachesByTransaction.entrySet()) { - if (entry.getValue().containsKey(workspaceName)) { - transactions.add(entry.getKey()); - } - } - } - for (Transaction transaction : transactions) { - try { - // rollback the transaction ... - transaction.rollback(); - } catch (SystemException e) { - Logger.getLogger(getClass()) - .error(JcrI18n.errorWhileRollingBackActiveTransactionUsingWorkspaceThatIsBeingDeleted, - workspaceName, - e.getMessage()); - } + String workspaceName = globalWorkspaceCache.getWorkspaceName(); + return transactionalCachesByTransaction.computeIfAbsent(txn, tx -> new HashMap<>()) + .computeIfAbsent(workspaceName, + wsName -> createCache(globalWorkspaceCache, txn)); } } + + public synchronized void rollbackActiveTransactionsForWorkspace(String workspaceName) { + List toRemove = new ArrayList<>(); + // first rollback all active transactions and collect them at the same time... + transactionalCachesByTransaction.entrySet().stream() + .filter(entry -> entry.getValue().containsKey(workspaceName)) + .map(Map.Entry::getKey) + .forEach(tx -> { + toRemove.add(tx); + try { + tx.rollback(); + } catch (SystemException e) { + LOGGER.error(e, + JcrI18n.errorWhileRollingBackActiveTransactionUsingWorkspaceThatIsBeingDeleted, + workspaceName, e.getMessage()); + } + }); + // then remove them from the map + toRemove.stream().forEach(transactionalCachesByTransaction::remove); + } protected synchronized void remove( Transaction txn ) { transactionalCachesByTransaction.remove(txn); } - - /** - * Invoke the supplied operation on each of the transactional workspace caches associated with the supplied transaction. - * - * @param txn the transaction; may not be null - * @param operation the operation to call on each {@link TransactionalWorkspaceCache} in the given transaction; may not be - * null - */ - synchronized void onAllWorkspacesInTransaction( final Transaction txn, - final OnEachTransactionalCache operation ) { - assert operation != null; - assert txn != null; - Map cachesForTxn = transactionalCachesByTransaction.get(txn); - if (cachesForTxn != null) { - for (TransactionalWorkspaceCache cache : cachesForTxn.values()) { - if (cache != null) operation.execute(cache); - } - } - } - - /** - * See #onAllWorkspacesInTransaction - */ - static interface OnEachTransactionalCache { - /** - * Invoke the operation on the supplied cache - * - * @param cache the transactional workspace cache; never null - */ - void execute( TransactionalWorkspaceCache cache ); + + protected synchronized Stream workspaceCachesFor(final Transaction txn) { + return transactionalCachesByTransaction.getOrDefault(txn, Collections.emptyMap()).values().stream(); } - protected TransactionalWorkspaceCache createCache( WorkspaceCache sharedWorkspaceCache, - final Transaction txn ) throws SystemException, RollbackException { + private TransactionalWorkspaceCache createCache(WorkspaceCache sharedWorkspaceCache, + final Transaction txn) { final TransactionalWorkspaceCache cache = new TransactionalWorkspaceCache(sharedWorkspaceCache, this, txn); - txn.registerSynchronization(new Synchronization() { - - @Override - public void beforeCompletion() { - // do nothing ... - } - - @Override - public void afterCompletion( int status ) { - // No matter what, remove this transactional cache from the maps ... - remove(txn); - } - }); + try { + txn.registerSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + // do nothing ... + } + + @Override + public void afterCompletion( int status ) { + // No matter what, remove this transactional cache from the maps ... + remove(txn); + } + }); + } catch (RollbackException | SystemException e) { + throw new RuntimeException(e); + } return cache; } } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WorkspaceCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WorkspaceCache.java index 666f822f48..ef1624a3c8 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WorkspaceCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WorkspaceCache.java @@ -24,12 +24,12 @@ import org.modeshape.common.logging.Logger; import org.modeshape.jcr.ExecutionContext; import org.modeshape.jcr.JcrI18n; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.bus.ChangeBus; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.ChildReference; import org.modeshape.jcr.cache.NodeCache; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.WorkspaceNotFoundException; import org.modeshape.jcr.cache.change.ChangeSet; import org.modeshape.jcr.cache.change.ChangeSetListener; diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WritableSessionCache.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WritableSessionCache.java index d94dd2b8f8..88c1a9147d 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WritableSessionCache.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/cache/document/WritableSessionCache.java @@ -39,6 +39,7 @@ import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; +import javax.transaction.Status; import javax.transaction.SystemException; import org.infinispan.schematic.Schematic; import org.infinispan.schematic.SchematicEntry; @@ -54,6 +55,7 @@ import org.modeshape.jcr.JcrI18n; import org.modeshape.jcr.JcrLexicon; import org.modeshape.jcr.NodeTypes; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.TimeoutException; import org.modeshape.jcr.api.Binary; import org.modeshape.jcr.api.value.DateTime; @@ -72,7 +74,6 @@ import org.modeshape.jcr.cache.NodeNotFoundException; import org.modeshape.jcr.cache.PathCache; import org.modeshape.jcr.cache.ReferentialIntegrityException; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; import org.modeshape.jcr.cache.WrappedException; import org.modeshape.jcr.cache.change.ChangeSet; @@ -119,33 +120,44 @@ public class WritableSessionCache extends AbstractSessionCache { private static final long PAUSE_TIME_BEFORE_REPEAT_FOR_LOCK_ACQUISITION_TIMEOUT = 50L; private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Transactions txns; + private final RepositoryEnvironment repositoryEnvironment; + private final ConcurrentHashMap completeTxFunctionByTxId = new ConcurrentHashMap<>(); + private final TransactionalWorkspaceCaches txWorkspaceCaches; private Map changedNodes; private Set replacedNodes; private LinkedHashSet changedNodesInOrder; private Map referrerChangesForRemovedNodes; - private final Transactions txns; - + + /** * Track the binary keys which are being referenced/unreferenced by nodes so they can be locked in ISPN */ - private final ConcurrentHashMap> binaryReferencesByNodeKey; + private final ConcurrentHashMap> binaryReferencesByNodeKey; /** * Create a new SessionCache that can be used for making changes to the workspace. * * @param context the execution context; may not be null * @param workspaceCache the (shared) workspace cache; may not be null + * @param txWorkspaceCaches a {@link TransactionalWorkspaceCaches} instance, never {@code null} * @param repositoryEnvironment the context for the session; may not be null */ - public WritableSessionCache( ExecutionContext context, - WorkspaceCache workspaceCache, - RepositoryEnvironment repositoryEnvironment ) { - super(context, workspaceCache, repositoryEnvironment); - this.changedNodes = new HashMap(); - this.changedNodesInOrder = new LinkedHashSet(); - this.referrerChangesForRemovedNodes = new HashMap(); - this.binaryReferencesByNodeKey = new ConcurrentHashMap>(); + public WritableSessionCache(ExecutionContext context, + WorkspaceCache workspaceCache, + TransactionalWorkspaceCaches txWorkspaceCaches, + RepositoryEnvironment repositoryEnvironment) { + super(context, workspaceCache); + this.changedNodes = new HashMap<>(); + this.changedNodesInOrder = new LinkedHashSet<>(); + this.referrerChangesForRemovedNodes = new HashMap<>(); + this.binaryReferencesByNodeKey = new ConcurrentHashMap<>(); + assert repositoryEnvironment != null; this.txns = repositoryEnvironment.getTransactions(); + this.repositoryEnvironment = repositoryEnvironment; + assert txWorkspaceCaches != null; + this.txWorkspaceCaches = txWorkspaceCaches; + checkForTransaction(); } protected final void assertInSession( SessionNode node ) { @@ -376,6 +388,60 @@ public boolean hasChanges() { } } + /** + * Signal that this session cache should check for an existing transaction and use the appropriate workspace cache. If there + * is a (new to this session) transaction, then this session will use a transaction-specific workspace cache (shared by other + * sessions participating in the same transaction), and upon completion of the transaction the session will switch back to the + * shared workspace cache. + */ + @Override + public void checkForTransaction() { + try { + Transactions transactions = repositoryEnvironment.getTransactions(); + javax.transaction.Transaction txn = transactions.getTransactionManager().getTransaction(); + if (txn != null && txn.getStatus() == Status.STATUS_ACTIVE) { + // There is an active transaction, so we need a transaction-specific workspace cache ... + setWorkspaceCache(txWorkspaceCaches.getTransactionalCache(getWorkspace())); + // only register the function if there's an active ModeShape transaction because we need to run the + // function *only after* ISPN has committed its transaction & updated the cache + // if there isn't an active ModeShape transaction, one will become active later during "save" + // otherwise, "save" is never called meaning this cache should be discarded + Transactions.Transaction modeshapeTx = transactions.currentModeShapeTransaction(); + if (modeshapeTx != null) { + // we can use the identity hash code as a tx id, because we essentially want a different tx function for each + // different transaction and as long as a tx is active, it should not be garbage collected, hence we should + // get different IDs for different transactions + final int txId = System.identityHashCode(modeshapeTx); + if (!completeTxFunctionByTxId.containsKey(txId)) { + // create and register the complete transaction function only once + Transactions.TransactionFunction completeFunction = () -> completeTransaction(txId); + if (completeTxFunctionByTxId.putIfAbsent(txId, completeFunction) == null) { + // we only want 1 completion function per tx id + modeshapeTx.uponCompletion(completeFunction); + } + } + } + } else { + // There is no active transaction, so just use the shared workspace cache ... + completeTransaction(null); + } + } catch (Exception e) { + logger().error(e, JcrI18n.errorDeterminingCurrentTransactionAssumingNone, workspaceName(), e.getMessage()); + } + } + + /** + * Signal that the transaction that was active and in which this session participated has completed and that this session + * should no longer use a transaction-specific workspace cache. + */ + private void completeTransaction(final Integer txId) { + // reset the ws cache to the shared (global one) + setWorkspaceCache(sharedWorkspaceCache()); + if (txId != null) { + completeTxFunctionByTxId.remove(txId); + } + } + protected final void logChangesBeingSaved( Iterable firstNodesInOrder, Map firstNodes, Iterable secondNodesInOrder, @@ -911,7 +977,7 @@ protected ChangeSet persistChanges( Iterable changedNodesInOrder, String workspaceName = persistedCache.getWorkspaceName(); String repositoryKey = persistedCache.getRepositoryKey(); RecordingChanges changes = new RecordingChanges(context.getId(), context.getProcessId(), repositoryKey, workspaceName, - sessionContext().journalId()); + repositoryEnvironment.journalId()); // Get the documentStore ... DocumentStore documentStore = persistedCache.documentStore(); @@ -1607,13 +1673,7 @@ public void destroy( NodeKey key ) { if (property != null) { if (property.isBinary() && !node.isPropertyNew(this, property.getName())) { // We need to register the binary value as not being in use anymore - for (Object binaryObject : property.getValuesAsArray()) { - assert binaryObject instanceof Binary; - if (binaryObject instanceof AbstractBinary) { - BinaryKey binaryKey = ((AbstractBinary)binaryObject).getKey(); - addBinaryReference(nodeKey, binaryKey); - } - } + collectBinaryReferences(nodeKey, property); } } } @@ -1641,13 +1701,7 @@ public void destroy( NodeKey key ) { if (property.isBinary()) { // We need to register the binary value as not being in use anymore - for (Object binaryObject : property.getValuesAsArray()) { - assert binaryObject instanceof Binary; - if (binaryObject instanceof AbstractBinary) { - BinaryKey binaryKey = ((AbstractBinary)binaryObject).getKey(); - addBinaryReference(nodeKey, binaryKey); - } - } + collectBinaryReferences(nodeKey, property); } } } @@ -1689,19 +1743,28 @@ public void destroy( NodeKey key ) { } } + private void collectBinaryReferences(NodeKey nodeKey, Property property) { + property.forEach(value->{ + assert value instanceof Binary; + if (value instanceof AbstractBinary) { + BinaryKey binaryKey = ((AbstractBinary)value).getKey(); + addBinaryReference(nodeKey, binaryKey); + } + }); + } + @Override public boolean isDestroyed( NodeKey key ) { return changedNodes.get(key) == REMOVED; } - @Override protected void addBinaryReference( NodeKey nodeKey, BinaryKey... binaryKeys ) { if (binaryKeys.length == 0) { return; } Set binaryReferencesForNode = this.binaryReferencesByNodeKey.get(nodeKey); if (binaryReferencesForNode == null) { - Set emptySet = Collections.newSetFromMap(new ConcurrentHashMap()); + Set emptySet = Collections.newSetFromMap(new ConcurrentHashMap<>()); binaryReferencesForNode = this.binaryReferencesByNodeKey.putIfAbsent(nodeKey, emptySet); if (binaryReferencesForNode == null) { binaryReferencesForNode = emptySet; diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/federation/FederatedDocumentStore.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/federation/FederatedDocumentStore.java index b8d0c02042..374fe036ff 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/federation/FederatedDocumentStore.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/federation/FederatedDocumentStore.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Set; import javax.transaction.TransactionManager; -import javax.transaction.xa.XAResource; import org.infinispan.schematic.SchematicEntry; import org.infinispan.schematic.document.Document; import org.infinispan.schematic.document.EditableDocument; @@ -118,7 +117,7 @@ public String newDocumentKey( String parentKey, public SchematicEntry storeDocument( String key, Document document ) { if (isLocalSource(key)) { - return localStore().putIfAbsent(key, document); + return localStore().storeDocument(key, document); } Connector connector = connectors.getConnectorForSourceKey(sourceKey(key)); if (connector != null) { @@ -347,11 +346,6 @@ public TransactionManager transactionManager() { return localStore().transactionManager(); } - @Override - public XAResource xaResource() { - return localStore().xaResource(); - } - @Override public void setLocalSourceKey( String localSourceKey ) { this.localDocumentStore.setLocalSourceKey(localSourceKey); diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/NoClientTransactions.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/NoClientTransactions.java index 0b68dcf01d..a3e771e321 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/NoClientTransactions.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/NoClientTransactions.java @@ -44,7 +44,7 @@ public NoClientTransactions( TransactionManager txnMgr ) { } @Override - public Transaction currentTransaction() { + public Transaction currentModeShapeTransaction() { return ACTIVE_TRANSACTION.get(); } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/SynchronizedTransactions.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/SynchronizedTransactions.java index b16e4d57ee..3e21ac59b3 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/SynchronizedTransactions.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/SynchronizedTransactions.java @@ -67,7 +67,7 @@ public SynchronizedTransactions( TransactionManager txnMgr, Cache localCache ) { } @Override - public Transaction currentTransaction() { + public Transaction currentModeShapeTransaction() { Transaction localTx = LOCAL_TRANSACTION.get(); if (localTx != null) { return localTx; @@ -132,14 +132,11 @@ public Transaction begin() throws NotSupportedException, SystemException { txnMgr.begin(); localTx = new NestableThreadLocalTransaction(txnMgr, LOCAL_TRANSACTION).begin(); // we'll resume the original transaction once we've completed (regardless whether successfully or not) - localTx.uponCompletion(new TransactionFunction() { - @Override - public void execute() { - try { - txnMgr.resume(suspended); - } catch (Exception e) { - throw new RuntimeException(e); - } + localTx.uponCompletion(() -> { + try { + txnMgr.resume(suspended); + } catch (Exception e) { + throw new RuntimeException(e); } }); return logTransactionInformation(localTx); @@ -206,12 +203,7 @@ public void updateCache( final WorkspaceCache workspace, if (changes != null && !changes.isEmpty()) { if (transaction instanceof SynchronizedTransaction) { // only issue the changes when the transaction is successfully committed - transaction.uponCommit(new TransactionFunction() { - @Override - public void execute() { - workspace.changed(changes); - } - }); + transaction.uponCommit(() -> workspace.changed(changes)); if (workspace instanceof TransactionalWorkspaceCache) { ((TransactionalWorkspaceCache)workspace).changedWithinTransaction(changes); } diff --git a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/Transactions.java b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/Transactions.java index 4dfb12191a..f6ce9f33de 100644 --- a/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/Transactions.java +++ b/modeshape-jcr/src/main/java/org/modeshape/jcr/txn/Transactions.java @@ -121,11 +121,16 @@ public TransactionManager getTransactionManager() { public abstract Transaction begin() throws NotSupportedException, SystemException; /** - * Returns a the current ModeShape transaction, if one exists. + * Returns a the current ModeShape transaction, if one exists. + *

+ * A ModeShape transaction may not necessarily exist when a + * {@link javax.transaction.Transaction} is active. This is because ModeShape transactions are only created when a + * {@link org.modeshape.jcr.JcrSession} is saved. + *

* * @return either a {@link org.modeshape.jcr.txn.Transactions.Transaction instance} or {@code null} */ - public abstract Transaction currentTransaction(); + public abstract Transaction currentModeShapeTransaction(); /** * Notify the workspace of the supplied changes, if and when the current transaction is completed. If the current thread is diff --git a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/AbstractSessionCacheTest.java b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/AbstractSessionCacheTest.java index 45f93908b6..eed19858b8 100644 --- a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/AbstractSessionCacheTest.java +++ b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/AbstractSessionCacheTest.java @@ -19,14 +19,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import javax.transaction.TransactionManager; import org.modeshape.jcr.ExecutionContext; import org.modeshape.jcr.NodeTypes; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.bus.RepositoryChangeBus; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.NodeCache; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; import org.modeshape.jcr.cache.change.PrintingChangeSetListener; import org.modeshape.jcr.txn.NoClientTransactions; @@ -57,9 +56,10 @@ protected NodeCache createCache() { workspaceCache = new WorkspaceCache(context, "repo", "ws", null, documentStore, translator, ROOT_KEY_WS1, nodeCache, changeBus, null); loadJsonDocuments(resource(resourceNameForWorkspaceContentDocument())); - RepositoryEnvironment sessionEnv = createRepositoryEnvironment(); - session1 = createSessionCache(context, workspaceCache, sessionEnv); - session2 = createSessionCache(context, workspaceCache, sessionEnv); + RepositoryEnvironment repositoryEnv = createRepositoryEnvironment(); + TransactionalWorkspaceCaches txWsCaches = new TransactionalWorkspaceCaches(repositoryEnv.getTransactions()); + session1 = createSessionCache(context, workspaceCache, txWsCaches, repositoryEnv); + session2 = createSessionCache(context, workspaceCache, txWsCaches, repositoryEnv); return session1; } @@ -71,23 +71,14 @@ protected void shutdownCache( NodeCache cache ) { protected abstract SessionCache createSessionCache( ExecutionContext context, WorkspaceCache cache, + TransactionalWorkspaceCaches txWsCaches, RepositoryEnvironment sessionEnv ); protected RepositoryEnvironment createRepositoryEnvironment() { - final TransactionManager txnMgr = txnManager(); return new RepositoryEnvironment() { - private final Transactions transactions = new NoClientTransactions(txnMgr); - private final TransactionalWorkspaceCaches transactionalWorkspaceCacheFactory = new TransactionalWorkspaceCaches( - transactions); - @Override public Transactions getTransactions() { - return transactions; - } - - @Override - public TransactionalWorkspaceCaches getTransactionalWorkspaceCacheFactory() { - return transactionalWorkspaceCacheFactory; + return new NoClientTransactions(txnManager()); } @Override diff --git a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/DocumentOptimizerTest.java b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/DocumentOptimizerTest.java index 1645199a66..24d4a75402 100644 --- a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/DocumentOptimizerTest.java +++ b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/DocumentOptimizerTest.java @@ -28,10 +28,10 @@ import org.junit.Before; import org.junit.Test; import org.modeshape.jcr.ExecutionContext; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.cache.ChildReference; import org.modeshape.jcr.cache.MutableCachedNode; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; import org.modeshape.jcr.value.Name; import org.modeshape.jcr.value.Path.Segment; @@ -50,8 +50,9 @@ public void beforeEach() { @Override protected SessionCache createSessionCache( ExecutionContext context, WorkspaceCache cache, + TransactionalWorkspaceCaches txWsCaches, RepositoryEnvironment repositoryEnvironment ) { - return new WritableSessionCache(context, workspaceCache, repositoryEnvironment); + return new WritableSessionCache(context, workspaceCache, txWsCaches, repositoryEnvironment); } @Test diff --git a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/ReadOnlySessionCacheTest.java b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/ReadOnlySessionCacheTest.java index 4913bfd343..48977a6dee 100644 --- a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/ReadOnlySessionCacheTest.java +++ b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/ReadOnlySessionCacheTest.java @@ -16,7 +16,7 @@ package org.modeshape.jcr.cache.document; import org.modeshape.jcr.ExecutionContext; -import org.modeshape.jcr.cache.RepositoryEnvironment; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; /** @@ -27,7 +27,8 @@ public class ReadOnlySessionCacheTest extends AbstractSessionCacheTest { @Override protected SessionCache createSessionCache( ExecutionContext context, WorkspaceCache cache, + TransactionalWorkspaceCaches txWsCaches, RepositoryEnvironment repositoryEnvironment ) { - return new ReadOnlySessionCache(context, workspaceCache, repositoryEnvironment); + return new ReadOnlySessionCache(context, workspaceCache); } } diff --git a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/WritableSessionCacheTest.java b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/WritableSessionCacheTest.java index 8c76b4e707..534cfd95f2 100644 --- a/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/WritableSessionCacheTest.java +++ b/modeshape-jcr/src/test/java/org/modeshape/jcr/cache/document/WritableSessionCacheTest.java @@ -26,10 +26,10 @@ import org.junit.Test; import org.modeshape.common.statistic.Stopwatch; import org.modeshape.jcr.ExecutionContext; +import org.modeshape.jcr.RepositoryEnvironment; import org.modeshape.jcr.cache.CachedNode; import org.modeshape.jcr.cache.MutableCachedNode; import org.modeshape.jcr.cache.NodeKey; -import org.modeshape.jcr.cache.RepositoryEnvironment; import org.modeshape.jcr.cache.SessionCache; /** @@ -49,8 +49,9 @@ public void beforeEach() { @Override protected SessionCache createSessionCache( ExecutionContext context, WorkspaceCache cache, + TransactionalWorkspaceCaches txWsCaches, RepositoryEnvironment repositoryEnvironment ) { - return new WritableSessionCache(context, workspaceCache, repositoryEnvironment); + return new WritableSessionCache(context, workspaceCache, txWsCaches, repositoryEnvironment); } @Test diff --git a/modeshape-schematic/src/main/java/org/infinispan/schematic/SchematicDb.java b/modeshape-schematic/src/main/java/org/infinispan/schematic/SchematicDb.java index 04da6917a4..88f7f6b6b5 100644 --- a/modeshape-schematic/src/main/java/org/infinispan/schematic/SchematicDb.java +++ b/modeshape-schematic/src/main/java/org/infinispan/schematic/SchematicDb.java @@ -120,16 +120,6 @@ void put( String key, SchematicEntry putIfAbsent( String key, Document document ); - /** - * Replace the existing document and metadata at the given key with the document that is supplied. This method does nothing if - * there is not an existing entry at the given key. - * - * @param key the key or identifier for the document - * @param document the new document that is to replace the existing document (or binary content) - */ - void replace( String key, - Document document ); - /** * Remove the existing document at the given key. * diff --git a/modeshape-schematic/src/main/java/org/infinispan/schematic/internal/CacheSchematicDb.java b/modeshape-schematic/src/main/java/org/infinispan/schematic/internal/CacheSchematicDb.java index f21fb52853..788dd714b2 100644 --- a/modeshape-schematic/src/main/java/org/infinispan/schematic/internal/CacheSchematicDb.java +++ b/modeshape-schematic/src/main/java/org/infinispan/schematic/internal/CacheSchematicDb.java @@ -117,14 +117,6 @@ public SchematicEntry putIfAbsent( String key, return store.putIfAbsent(key, newEntry); } - @Override - public void replace( String key, - Document document ) { - SchematicEntryLiteral newEntry = new SchematicEntryLiteral(key, document); - // use storeForWriting because we don't care about the return type - i.e. we're doing a local replace - storeForWriting.replace(key, newEntry); - } - @Override public SchematicEntry remove( String key ) { // we can't use storeForWriting, because we care about the return value here