Skip to content

Commit

Permalink
Merge pull request #1570 from hchiorean/MODE-2610
Browse files Browse the repository at this point in the history
MODE-2610 Fixes another transactional issue
  • Loading branch information
Horia Chiorean committed Jun 13, 2016
2 parents 89e0453 + bbb3af0 commit 884530a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 80 deletions.
Expand Up @@ -72,12 +72,12 @@ public void changed( ChangeSet changes ) {
* @see Transactions#updateCache(WorkspaceCache, ChangeSet, org.modeshape.jcr.txn.Transactions.Transaction)
*/
public void changedWithinTransaction( final ChangeSet changes ) {
txWorkspaceCaches.workspaceCachesFor(txn).forEach(cache -> cache.internalChangedWithinTransaction(changes));
txWorkspaceCaches.dispatchChangesForTransaction(txn, changes);
}

@Override
public void clear() {
txWorkspaceCaches.workspaceCachesFor(txn).forEach(TransactionalWorkspaceCache::internalClear);
txWorkspaceCaches.clearAllCachesForTransaction(txn);
}

protected void internalClear() {
Expand Down
Expand Up @@ -17,10 +17,9 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
Expand All @@ -30,6 +29,7 @@
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.JcrI18n;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.txn.Transactions;

/**
Expand All @@ -39,7 +39,7 @@ public class TransactionalWorkspaceCaches {
private static final Logger LOGGER = Logger.getLogger(TransactionalWorkspaceCache.class);

private final TransactionManager txnMgr;
private final Map<Transaction, Map<String, TransactionalWorkspaceCache>> transactionalCachesByTransaction = new HashMap<>();
private final Map<Transaction, Map<String, TransactionalWorkspaceCache>> transactionalCachesByTransaction = new ConcurrentHashMap<>();

public TransactionalWorkspaceCaches( Transactions transactions ) {
CheckArg.isNotNull(transactions, "transactions");
Expand All @@ -51,22 +51,40 @@ protected WorkspaceCache getTransactionalCache(WorkspaceCache globalWorkspaceCac
Transaction txn = txnMgr.getTransaction();
if (txn == null || txn.getStatus() != Status.STATUS_ACTIVE) return globalWorkspaceCache;

String workspaceName = globalWorkspaceCache.getWorkspaceName();
return transactionalCachesByTransaction.computeIfAbsent(txn, this::newCacheMapForTransaction)
.computeIfAbsent(workspaceName, wsName -> new TransactionalWorkspaceCache(globalWorkspaceCache, this, txn));
}

private Map<String, TransactionalWorkspaceCache> newCacheMapForTransaction(final Transaction txn) {
try {
txn.registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
// do nothing ...
}

synchronized (this) {
String workspaceName = globalWorkspaceCache.getWorkspaceName();
return transactionalCachesByTransaction.computeIfAbsent(txn, tx -> new HashMap<>())
.computeIfAbsent(workspaceName,
wsName -> createCache(globalWorkspaceCache, txn));
@Override
public void afterCompletion(int status) {
// No matter what, remove this transactional cache from the maps ...
Map<String, TransactionalWorkspaceCache> cachesByWsName = transactionalCachesByTransaction.remove(txn);
cachesByWsName.clear();
}
});
} catch (RollbackException | SystemException e) {
throw new RuntimeException(e);
}
return new ConcurrentHashMap<>();
}

public synchronized void rollbackActiveTransactionsForWorkspace(String workspaceName) {
List<Transaction> toRemove = new ArrayList<>();
// first rollback all active transactions and collect them at the same time...
transactionalCachesByTransaction.entrySet().stream()
transactionalCachesByTransaction.entrySet()
.stream()
.filter(entry -> entry.getValue().containsKey(workspaceName))
.map(Map.Entry::getKey)
.forEach(tx -> {
.forEach(entry -> {
Transaction tx = entry.getKey();
toRemove.add(tx);
try {
tx.rollback();
Expand All @@ -77,36 +95,16 @@ public synchronized void rollbackActiveTransactionsForWorkspace(String workspace
}
});
// then remove them from the map
toRemove.stream().forEach(transactionalCachesByTransaction::remove);
transactionalCachesByTransaction.keySet().removeAll(toRemove);
}

protected synchronized void remove( Transaction txn ) {
transactionalCachesByTransaction.remove(txn);
}

protected synchronized Stream<TransactionalWorkspaceCache> workspaceCachesFor(final Transaction txn) {
return transactionalCachesByTransaction.getOrDefault(txn, Collections.emptyMap()).values().stream();
protected void clearAllCachesForTransaction(final Transaction txn) {
transactionalCachesByTransaction.getOrDefault(txn, Collections.emptyMap())
.forEach((wsName, txWsCache) -> txWsCache.internalClear());
}

private TransactionalWorkspaceCache createCache(WorkspaceCache sharedWorkspaceCache,
final Transaction txn) {
final TransactionalWorkspaceCache cache = new TransactionalWorkspaceCache(sharedWorkspaceCache, this, txn);
try {
txn.registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
// do nothing ...
}

@Override
public void afterCompletion( int status ) {
// No matter what, remove this transactional cache from the maps ...
remove(txn);
}
});
} catch (RollbackException | SystemException e) {
throw new RuntimeException(e);
}
return cache;
protected void dispatchChangesForTransaction(final Transaction txn, final ChangeSet changes) {
transactionalCachesByTransaction.getOrDefault(txn, Collections.emptyMap())
.forEach((wsName, txWsCache) -> txWsCache.internalChangedWithinTransaction(changes));
}
}
Expand Up @@ -336,6 +336,7 @@ protected void loadFromDocumentStore(Set<String> keys) {
String key = entry.id();
Document document = entry.content();
NodeKey nodeKey = new NodeKey(key);
// in some cases (user transactions) we may be replacing a node, but it's important to do so
this.nodesByKey.put(nodeKey, new LazyCachedNode(nodeKey, document));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Read a fresh copy from the document store for '{0}' and stored it in the tx ws cache as '{1}'",
Expand Down
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
Expand Down Expand Up @@ -407,8 +406,13 @@ public void checkForTransaction() {
Transactions transactions = repositoryEnvironment.getTransactions();
javax.transaction.Transaction txn = transactions.getTransactionManager().getTransaction();
if (txn != null && txn.getStatus() == Status.STATUS_ACTIVE) {
WorkspaceCache workspaceCache = getWorkspace();
if (workspaceCache instanceof TransactionalWorkspaceCache) {
// we're already inside a transaction and have a tx workspace cache set
return;
}
// There is an active transaction, so we need a transaction-specific workspace cache ...
setWorkspaceCache(txWorkspaceCaches.getTransactionalCache(getWorkspace()));
setWorkspaceCache(txWorkspaceCaches.getTransactionalCache(workspaceCache));
// only register the function if there's an active ModeShape transaction because we need to run the
// function *only after* the persistent storage has been updated
// if there isn't an active ModeShape transaction, one will become active later during "save"
Expand Down Expand Up @@ -1521,61 +1525,64 @@ private void lockNodes(Collection<NodeKey> changedNodesInOrder) {
}
}
// Try to acquire from the DocumentStore locks for all the nodes that we're going to change ...
Set<String> keysToLock = changedNodesInOrder.stream().map(this::keysToLockForNode).collect(TreeSet::new,
TreeSet::addAll,
TreeSet::addAll);
Set<String> changedNodesKeys = changedNodesInOrder.stream().map(this::keysToLockForNode).collect(TreeSet::new,
TreeSet::addAll,
TreeSet::addAll);

// we may already have a list of locked nodes, so remove the ones that we've already locked (and we hold the lock for)
Transaction modeshapeTx = repositoryEnvironment.getTransactions().currentTransaction();
assert modeshapeTx != null;
String txId = modeshapeTx.id();
Set<String> lockedKeysForTx = LOCKED_KEYS_BY_TX_ID.computeIfAbsent(txId, id -> new LinkedHashSet<>());
if (keysToLock.removeAll(lockedKeysForTx)) {
if (lockedKeysForTx.containsAll(changedNodesKeys)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The keys {0} have been locked previously as part of the transaction {1}; skipping them...",
lockedKeysForTx, txId);
changedNodesKeys, txId);
}
}
// no new locks that we don't already own are required, so just return...
if (keysToLock.isEmpty()) {
return;
}

int retryCountOnLockTimeout = 3;
boolean locksAcquired = false;
while (!locksAcquired && retryCountOnLockTimeout > 0) {
locksAcquired = documentStore.lockDocuments(keysToLock);
if (locksAcquired) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Locked the nodes: {0}", keysToLock);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Timeout while attempting to lock keys {0}. Retrying....", keysToLock);
} else {
// there are new nodes that we need to lock...
Set<String> newKeysToLock = new TreeSet<>(changedNodesKeys);
newKeysToLock.removeAll(lockedKeysForTx);
int retryCountOnLockTimeout = 3;
boolean locksAcquired = false;
while (!locksAcquired && retryCountOnLockTimeout > 0) {
locksAcquired = documentStore.lockDocuments(newKeysToLock);
if (locksAcquired) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Locked the nodes: {0}", newKeysToLock);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Timeout while attempting to lock keys {0}. Retrying....", newKeysToLock);
}
--retryCountOnLockTimeout;
}
--retryCountOnLockTimeout;
}
if (!locksAcquired) {
throw new TimeoutException(
"Timeout while attempting to lock the keys " + changedNodesKeys + " after " + retryCountOnLockTimeout +
" retry attempts.");
}
lockedKeysForTx.addAll(newKeysToLock);
}
if (!locksAcquired) {
throw new TimeoutException("Timeout while attempting to lock the keys " + keysToLock + " after " + retryCountOnLockTimeout +
" retry attempts.");
}
lockedKeysForTx.addAll(keysToLock);

// now that we've locked the keys, load all of the from the document store
// note that some of the keys may be new but it's important to pass the entire set down to the document store
workspaceCache.loadFromDocumentStore(keysToLock);
workspaceCache.loadFromDocumentStore(changedNodesKeys);
}

private Set<String> keysToLockForNode(NodeKey key) {
Set<String> keys = new TreeSet<>();
//always the node itself
keys.add(key.toString());
//and potentially any binary keys it references for which we'll need to change references
TreeSet<String> binaryRefKeys = binaryReferencesByNodeKey.getOrDefault(key, Collections.emptySet()).stream()
.map(binaryKey -> translator().keyForBinaryReferenceDocument(
binaryKey.toString()))
.collect(Collectors.toCollection(TreeSet::new));
keys.addAll(binaryRefKeys);
Set<BinaryKey> binaryReferencesForNode = binaryReferencesByNodeKey.get(key);
if (binaryReferencesForNode == null || binaryReferencesForNode.isEmpty()) {
return keys;
}
DocumentTranslator translator = translator();
for (BinaryKey binaryKey : binaryReferencesForNode) {
keys.add(translator.keyForBinaryReferenceDocument(binaryKey.toString()));
}
return keys;
}

Expand Down
Expand Up @@ -542,7 +542,7 @@ public void shouldNotCorruptDataWhenConcurrentlyWritingAndQuerying() throws Exce

@Test
@FixFor( "MODE-2607" )
public void shouldBeAbleToUpdateParentNodeWithOneSessionAndRemoveChildNodeWithAnotherSession() throws Exception {
public void shouldUpdateParentAndRemoveChildWithDifferentTransactions1() throws Exception {
final String parentPath = "/parent";
final String childPath = "/parent/child";

Expand All @@ -565,7 +565,7 @@ public void shouldBeAbleToUpdateParentNodeWithOneSessionAndRemoveChildNodeWithAn

child = session.getNode(childPath);
child.remove();
session.save();//Fail
session.save();
commitTransaction();

//check that the editing worked in a new tx
Expand All @@ -575,6 +575,37 @@ public void shouldBeAbleToUpdateParentNodeWithOneSessionAndRemoveChildNodeWithAn
assertNoNode("/parent/child");
commitTransaction();
}

@Test
@FixFor( "MODE-2610" )
public void shouldUpdateParentAndRemoveChildWithDifferentTransactions2() throws Exception {
final String parentPath = "/parent";
final String childPath = "/parent/child";

startTransaction();
Node parent = session.getRootNode().addNode("parent");
parent.setProperty("foo", "parent");
Node child = parent.addNode("child");
child.setProperty("foo", "child");
session.save();
commitTransaction();

startTransaction();
child = session.getNode(childPath);
parent = session.getNode(parentPath);
parent.setProperty("foo", "bar2");
session.save();
child.remove();
session.save();
commitTransaction();

startTransaction();
parent = session.getNode(parentPath);
assertEquals("bar2", parent.getProperty("foo").getString());
assertNoNode("/parent/child");
session.logout();
commitTransaction();
}

private void insertAndQueryNodes(int i) {
Session session = null;
Expand Down

0 comments on commit 884530a

Please sign in to comment.