Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODE-2672 Fixes the handling of node removals spanning multiple session saves via the same user transaction #1639

Merged
merged 1 commit into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ protected Property parse( String line,
}
}
if (values.isEmpty()) return null;
return propertyFactory.create(name, type, values);
return propertyFactory.create(name, type, values.size() > 1 ? values : values.get(0));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -3527,7 +3528,24 @@ public String toString() {
return e.getMessage();
}
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AbstractJcrNode)) {
return false;
}
AbstractJcrNode that = (AbstractJcrNode) o;
return Objects.equals(key, that.key);
}

@Override
public int hashCode() {
return Objects.hash(key);
}

/**
* Determines whether this node, or any nodes below it, contain changes that depend on nodes that are outside of this node's
* hierarchy.
Expand Down
18 changes: 12 additions & 6 deletions modeshape-jcr/src/main/java/org/modeshape/jcr/JcrSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -1178,9 +1178,7 @@ public void save()
try {
cache().save(systemContent.cache(),
new JcrPreSave(systemContent, baseVersionKeys, originalVersionKeys, aclChangesCount()));
this.baseVersionKeys.set(null);
this.originalVersionKeys.set(null);
this.aclChangesCount.set(0);
clearState();
} catch (WrappedException e) {
Throwable cause = e.getCause();
throw (cause instanceof RepositoryException) ? (RepositoryException)cause : new RepositoryException(e.getCause());
Expand All @@ -1205,7 +1203,16 @@ public void save()
// The repository has been shutdown ...
}
}


private void clearState() {
this.cache.clear();
this.baseVersionKeys.set(null);
this.originalVersionKeys.set(null);
this.aclChangesCount.set(0);
this.jcrNodes.clear();
this.shareableNodeCache().clear();
}

/**
* Save a subset of the changes made within this session.
*
Expand Down Expand Up @@ -1281,8 +1288,7 @@ void save( AbstractJcrNode node ) throws RepositoryException {
public void refresh( boolean keepChanges ) throws RepositoryException {
checkLive();
if (!keepChanges) {
cache.clear();
aclChangesCount.set(0);
clearState();
}
// Otherwise there is nothing to do, as all persistent changes are always immediately visible to all sessions
// using that same workspace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
*/
final class JcrSharedNodeCache {

private final ConcurrentMap<NodeKey, SharedSet> sharedSets = new ConcurrentHashMap<NodeKey, SharedSet>();
private final ConcurrentMap<NodeKey, SharedSet> sharedSets = new ConcurrentHashMap<>();

protected final JcrSession session;

Expand All @@ -70,13 +70,11 @@ public JcrSession session() {
*/
public SharedSet getSharedSet( AbstractJcrNode shareableNode ) {
NodeKey shareableNodeKey = shareableNode.key();
SharedSet sharedSet = sharedSets.get(shareableNodeKey);
if (sharedSet == null) {
SharedSet newSharedSet = new SharedSet(shareableNode);
sharedSet = sharedSets.putIfAbsent(shareableNodeKey, newSharedSet);
if (sharedSet == null) sharedSet = newSharedSet;
}
return sharedSet;
return sharedSets.computeIfAbsent(shareableNodeKey, key -> new SharedSet(shareableNode));
}

protected void clear() {
sharedSets.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void removeVersion( Version versionToBeRemoved )
throws ReferentialIntegrityException, AccessDeniedException, UnsupportedRepositoryOperationException, VersionException,
RepositoryException {

assert versionToBeRemoved.getParent() == this;
assert versionToBeRemoved.getParent().equals(this);

if (versionToBeRemoved.getName().equalsIgnoreCase(JcrLexicon.ROOT_VERSION.getString())) {
//the root version should not be removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1514,33 +1514,34 @@ private void lockNodes(Collection<NodeKey> changedNodesInOrder) {
logger.debug("The keys {0} have been locked previously as part of the transaction {1}; skipping them...",
changedNodesKeys, txId);
}
} else {
// there are new nodes that we need to lock...
Set<String> newKeysToLock = new TreeSet<>(changedNodesKeys);
newKeysToLock.removeAll(lockedKeysForTx);
int retryCountOnLockTimeout = 3;
boolean locksAcquired = false;
while (!locksAcquired && retryCountOnLockTimeout > 0) {
locksAcquired = documentStore.lockDocuments(newKeysToLock);
if (locksAcquired) {
if (logger.isDebugEnabled()) {
logger.debug("Locked the nodes: {0}", newKeysToLock);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Timeout while attempting to lock keys {0}. Retrying....", newKeysToLock);
}
--retryCountOnLockTimeout;
return;
}

// there are new nodes that we need to lock...
Set<String> newKeysToLock = new TreeSet<>(changedNodesKeys);
newKeysToLock.removeAll(lockedKeysForTx);
int retryCountOnLockTimeout = 3;
boolean locksAcquired = false;
while (!locksAcquired && retryCountOnLockTimeout > 0) {
locksAcquired = documentStore.lockDocuments(newKeysToLock);
if (locksAcquired) {
if (logger.isDebugEnabled()) {
logger.debug("Locked the nodes: {0}", newKeysToLock);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Timeout while attempting to lock keys {0}. Retrying....", newKeysToLock);
}
--retryCountOnLockTimeout;
}
if (!locksAcquired) {
throw new TimeoutException(
"Timeout while attempting to lock the keys " + changedNodesKeys + " after " + retryCountOnLockTimeout +
" retry attempts.");
}
lockedKeysForTx.addAll(newKeysToLock);
}

if (!locksAcquired) {
throw new TimeoutException(
"Timeout while attempting to lock the keys " + changedNodesKeys + " after " + retryCountOnLockTimeout +
" retry attempts.");
}
lockedKeysForTx.addAll(newKeysToLock);

// now that we've locked the keys, load all of the from the document store
// note that some of the keys may be new but it's important to pass the entire set down to the document store
workspaceCache.loadFromDocumentStore(changedNodesKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsSame.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.io.InputStream;
import javax.jcr.Node;
import org.junit.Test;
Expand Down Expand Up @@ -127,7 +127,7 @@ public void shouldAllowSequencerToBeConfiguredWithOnlyInputPath() throws Excepti
// Now verify that the test sequencer created a node ...
Node derivedNode = getOutputNode("/foo/" + TestSequencersHolder.DERIVED_NODE_NAME);
assertNotNull(derivedNode);
assertThat(derivedNode.getParent(), is(sameInstance(foo)));
assertThat(derivedNode.getParent(), is(foo));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ public void shouldAllowCreatingShareableNodeUnderParentThatDoesNotAllowSameNameS
assertSharedSetIs(sharedNode, originalPath, sharedPath);

// Remove the node from Joe ..
minibus = session.getNode("/NewSecondArea/Joe/Type 2");
minibus.remove();
session.save();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -124,7 +125,7 @@ public Document getById( Connection connection, String id ) throws SQLException
}

@Override
public <R> List<R> load( Connection connection, List<String> ids, Function<Document, R> parser ) throws SQLException {
public <R> List<R> load(Connection connection, Collection<String> ids, Function<Document, R> parser) throws SQLException {
if (logger.isDebugEnabled()) {
logger.debug("Loading ids {0} from {1}", ids.toString(), tableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -189,37 +190,37 @@ public Document get(String key) {

@Override
public List<SchematicEntry> load(Collection<String> keys) {
List<SchematicEntry> alreadyChangedInTransaction = new ArrayList<>();
List<SchematicEntry> alreadyChangedInTransaction = Collections.emptyList();
List<String> alreadyChangedKeys = new ArrayList<>();
if (TransactionsHolder.hasActiveTransaction()) {
// there's an active transaction so we want to look at stuff which we've already written in this tx and if there
// is anything, use it
for (Iterator<String> keysIterator = keys.iterator(); keysIterator.hasNext(); ) {
String key = keysIterator.next();
Document alreadyWrittenForTx = transactionalCaches.getForWriting(key);
if (alreadyWrittenForTx != null) {
// remove the key so we don't load it again from the DB
keysIterator.remove();
alreadyChangedInTransaction.add(() -> alreadyWrittenForTx);
}
}
alreadyChangedInTransaction = keys.stream()
.map(transactionalCaches::getForWriting)
.filter(Objects::nonNull)
.map(SchematicEntry::fromDocument)
.collect(ArrayList::new, (list, schematicEntry) -> {
alreadyChangedKeys.add(schematicEntry.id());
if (TransactionalCaches.REMOVED != schematicEntry.source()) {
list.add(schematicEntry);
}
}, ArrayList::addAll);
}

List<String> keysList = new ArrayList<>(keys);

keys.removeAll(alreadyChangedKeys);
Function<Document, SchematicEntry> documentParser = document -> {
SchematicEntry entry = SchematicEntry.fromDocument(document);
String id = entry.id();
//always cache it to mark it as "existing"
transactionalCaches.putForReading(id, document);
keysList.remove(id);
return entry;
};

List<SchematicEntry> results = runWithConnection(connection -> statements.load(connection, keysList, documentParser), true);
List<SchematicEntry> results = runWithConnection(connection -> statements.load(connection, keys, documentParser), true);
results.addAll(alreadyChangedInTransaction);
// if there's an active transaction make sure we also mark all the keys which were not found in the DB as 'new'
// to prevent further DB lookups
transactionalCaches.putNew(keysList);
transactionalCaches.putNew(keys);
return results;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -96,13 +97,13 @@ public interface Statements {
* </p>
*
* @param connection a {@link Connection} instance; may not be null
* @param ids a {@link List} of ids; may not be null
* @param ids a {@link Collection} of ids; may not be null
* @param parser a {@link Function} which is used to transform or process each of documents corresponding to the given IDS;
* may not be null
* @return a {@link List} of {@code Object} instances for each of the ids which were found in the DB; never {@code null}
* @throws SQLException if the operation fails.
*/
<R> List<R> load( Connection connection, List<String> ids, Function<Document, R> parser) throws SQLException;
<R> List<R> load(Connection connection, Collection<String> ids, Function<Document, R> parser) throws SQLException;

/**
* Starts a batch update operation with the given connection.
Expand Down