Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODE-2216 Fixed the potential corruption of the workspace cache during concurrent modification of the same nodes. #1138

Merged
merged 1 commit into from Jun 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,7 +53,7 @@ protected TransactionalWorkspaceCache( WorkspaceCache sharedWorkspaceCache,
TransactionalWorkspaceCaches cacheManager,
Transaction txn ) {
// Use a new in-memory map for the transactional cache ...
super(sharedWorkspaceCache, new ConcurrentHashMap<NodeKey, CachedNode>(), null);
super(sharedWorkspaceCache, new ConcurrentHashMap<NodeKey, CachedNode>());
this.sharedWorkspaceCache = sharedWorkspaceCache;
this.txn = txn;
this.cacheManager = cacheManager;
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.modeshape.jcr.cache.document;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.infinispan.commons.api.BasicCache;
Expand Down Expand Up @@ -108,8 +109,7 @@ public WorkspaceCache( ExecutionContext context,
}

protected WorkspaceCache( WorkspaceCache original,
ConcurrentMap<NodeKey, CachedNode> cache,
ChangeBus changeBus ) {
ConcurrentMap<NodeKey, CachedNode> cache ) {
this.context = original.context;
this.documentStore = original.documentStore;
this.translator = original.translator;
Expand All @@ -124,7 +124,8 @@ protected WorkspaceCache( WorkspaceCache original,
this.nodesByKey = cache;
this.systemChangeNotifier = null;
this.nonSystemChangeNotifier = null;
this.changeBus = changeBus;
//the change bus is not copied on purpose because this ctr should only be used for creating lightweight, "transient" instances
this.changeBus = null;
}

public void setMinimumStringLengthForBinaryStorage( long largeValueSize ) {
Expand Down Expand Up @@ -385,6 +386,26 @@ public String toString() {
return workspaceName;
}

/**
* Returns a workspace cache which has the latest persisted information read from Infinispan for the given nodes.
* After reading each node from Infinispan, that node will also be updated/inserted into *this* workspace cache.
*
* @param nodeKeys an {@code Iterable} of {@code NodeKey}; may not be null
* @return a workspace cache instance which only contains the latest persisted information for the requested nodes.
*/
protected WorkspaceCache persistedCache(Iterable<NodeKey> nodeKeys) {
ConcurrentHashMap<NodeKey, CachedNode> nodes = new ConcurrentHashMap<>();
for (NodeKey nodeKey : nodeKeys) {
Document nodeData = documentFor(nodeKey);
if (nodeData != null) {
CachedNode persistedNode = new LazyCachedNode(nodeKey, nodeData);
nodes.put(nodeKey, persistedNode);
this.nodesByKey.put(nodeKey, persistedNode);
}
}
return new WorkspaceCache(this, nodes);
}

protected final class SystemChangeNotifier implements ChangeSetListener {
private final String systemWorkspaceName;

Expand Down

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions modeshape-jcr/src/test/java/org/modeshape/jcr/ConcurrentWriteTest.java
Expand Up @@ -16,12 +16,22 @@
package org.modeshape.jcr;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -37,6 +47,7 @@
import org.modeshape.common.annotation.Immutable;
import org.modeshape.common.annotation.ThreadSafe;
import org.modeshape.common.util.CheckArg;
import org.modeshape.common.util.FileUtil;
import org.modeshape.common.util.StringUtil;
import org.modeshape.jcr.api.JcrTools;

Expand Down Expand Up @@ -198,6 +209,98 @@ public void run( Session session ) throws Exception {
verify(new NumberOfChildren(2, "testRoot"));
}

@Test
@FixFor( "MODE-2216" )
public void shouldMoveFileAndFoldersConcurrently() throws Exception {
if (repository != null) {
try {
TestingUtil.killRepositories(repository);
} finally {
repository = null;
config = null;
}
}

FileUtil.delete("target/move_repository");

int threadCount = 50;
String sourcePath = "/source";
String destPath = "/dest";

//this will import initial content into the source folder (see above)
repository = TestingUtil.startRepositoryWithConfig("config/repo-config-move.json");
Session session = repository.login();
NodeIterator sourceNodes = session.getNode(sourcePath).getNodes();
long expectedMoveCount = sourceNodes.getSize();

final List<Callable<String>> tasks = new ArrayList<Callable<String>>();
while (sourceNodes.hasNext()) {
final Node node = sourceNodes.nextNode();
final MoveNodeTask task = new MoveNodeTask(node.getIdentifier(), destPath);
tasks.add(task);
}

ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future<String>> futures = new ArrayList<Future<String>>();
for (Callable<String> task : tasks) {
futures.add(executorService.submit(task));
}
Set<String> movedNodeIds = new HashSet<String>();
for (Future<String> future : futures) {
movedNodeIds.add(future.get());
}

for (String id : movedNodeIds) {
Node node = session.getNodeByIdentifier(id);
assertNotNull("The document with " + id + " was not found!", node);
assertTrue("The document was not moved to destination folder!", node.getPath().startsWith(destPath));
}

NodeIterator destNodeIterator = session.getNode(destPath).getNodes();
while (destNodeIterator.hasNext()) {
assertNotNull("Node could be read", destNodeIterator.nextNode());
}

assertThat("Incorrect number of nodes moved", (long)movedNodeIds.size(), is(expectedMoveCount));
assertFalse("The source parent is not empty", session.getNode(sourcePath).getNodes().hasNext());
}

private class MoveNodeTask implements Callable<String> {

private String sourceId;
private String destinationPath;

public MoveNodeTask( final String sourceId, final String destinationPath ) {
this.sourceId = sourceId;
this.destinationPath = destinationPath;
}

@Override
public String call() throws Exception {
JcrSession session = repository.login();
final Node item = session.getNodeByIdentifier(sourceId);
String destAbsPath = destinationPath + "/" + item.getName();
String sourceAbsPath = item.getPath();
try {
if (print) {
System.out.println(Thread.currentThread().getName() + String.format(" Moving node from '%s' to '%s'", sourceAbsPath, destAbsPath));
}
session.move(item.getPath(), destAbsPath);
session.save();
session.save();
return item.getIdentifier();
} catch (Exception e) {
if (print) {
System.out.println(Thread.currentThread().getName() + String.format(" Exception moving node from '%s' to '%s'", sourceAbsPath, destAbsPath));
}
throw e;
} finally {
session.logout();
}

}
}

/**
* Method that can be called within a test method to run the supplied {@link Operation} just once in one thread. This is often
* useful for initializing content.
Expand Down
Expand Up @@ -1329,7 +1329,8 @@ public void shouldClearStartupProblemsOnRestart() throws Exception {
public void shouldReturnStartupProblemsAfterStarting() throws Exception {
shutdownDefaultRepository();
RepositoryConfiguration config = RepositoryConfiguration.read(getClass().getClassLoader()
.getResourceAsStream("config/repo-config-with-startup-problems.json"),
.getResourceAsStream(
"config/repo-config-with-startup-problems.json"),
"Deprecated config");
repository = new JcrRepository(config);
repository.start();
Expand All @@ -1344,7 +1345,8 @@ public void shouldStartupWithJournalingEnabled() throws Exception {
FileUtil.delete("target/journal");
shutdownDefaultRepository();
RepositoryConfiguration config = RepositoryConfiguration.read(getClass().getClassLoader()
.getResourceAsStream("config/repo-config-journaling.json"),
.getResourceAsStream(
"config/repo-config-journaling.json"),
"Deprecated config");
repository = new JcrRepository(config);
repository.start();
Expand Down
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
xmlns="urn:infinispan:config:6.0">

<global>
<globalJmxStatistics enabled="false" allowDuplicateDomains="true"/>
</global>

<namedCache name="persistentRepository">
<transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.DummyTransactionManagerLookup"
transactionMode="TRANSACTIONAL"
lockingMode="PESSIMISTIC"/>
<persistence
passivation="false">
<singleFile
shared="false"
preload="false"
fetchPersistentState="false"
purgeOnStartup="false"
location="target/move_repository/store">
</singleFile>
</persistence>
</namedCache>
</infinispan>
14 changes: 14 additions & 0 deletions modeshape-jcr/src/test/resources/config/repo-config-move.json
@@ -0,0 +1,14 @@
{
"name": "MoveRepository",
"workspaces" : {
"default" : "default",
"allowCreation" : true,
"initialContent" : {
"default" : "data/move-initial-data.xml"
}
},
"storage": {
"cacheName": "persistentRepository",
"cacheConfiguration": "config/infinispan-persistent-move.xml"
}
}