Skip to content

Commit

Permalink
HHH-10185 In nonstrict-read-write mode the remove may be not applied
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and galderz committed Nov 26, 2015
1 parent 9a9fb43 commit cc61914
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 15 deletions.
Expand Up @@ -32,6 +32,7 @@
*/
public class NonStrictAccessDelegate implements AccessDelegate {
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
private static final boolean trace = log.isTraceEnabled();

private final BaseTransactionalDataRegion region;
private final AdvancedCache cache;
Expand Down Expand Up @@ -90,10 +91,17 @@ public boolean putFromLoad(SessionImplementor session, Object key, Object value,
Object oldVersion = getVersion(prev);
if (oldVersion != null) {
if (versionComparator.compare(version, oldVersion) <= 0) {
if (trace) {
log.tracef("putFromLoad not executed since version(%s) <= oldVersion(%s)", version, oldVersion);
}
return false;
}
}
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
if (trace) {
log.tracef("putFromLoad not executed since tx started at %d and entry was invalidated at %d",
txTimestamp, ((VersionedEntry) prev).getTimestamp());
}
return false;
}
}
Expand All @@ -119,11 +127,13 @@ public boolean update(SessionImplementor session, Object key, Object value, Obje

@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
Object value = cache.get(key);
Object version = getVersion(value);
// there's no 'afterRemove', so we have to use our own synchronization
// the API does not provide version of removed item but we can't load it from the cache
// as that would be prone to race conditions - if the entry was updated in the meantime
// the remove could be discarded and we would end up with stale record
// See VersionedTest#testCollectionUpdate for such situation
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version);
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
}

Expand Down
Expand Up @@ -20,24 +20,17 @@
public class RemovalSynchronization extends InvocationAfterCompletion {
private final BaseTransactionalDataRegion region;
private final Object key;
private final Object version;

public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) {
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key) {
super(tc, cache, requiresTransaction);
this.region = region;
this.key = key;
this.version = version;
}

@Override
protected void invoke(boolean success, AdvancedCache cache) {
if (success) {
if (version == null) {
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
else {
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
}
}
Expand Up @@ -18,12 +18,9 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.NullValueConverter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.Comparator;
import java.util.Set;
import java.util.UUID;

/**
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
Expand Down
Expand Up @@ -73,6 +73,8 @@ protected void startUp() {

@Before
public void insertAndClearCache() throws Exception {
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
Item item = new Item("my item", "Original item");
withTxSession(s -> s.persist(item));
entityCache.clear();
Expand Down
Expand Up @@ -3,12 +3,19 @@
import org.hibernate.PessimisticLockException;
import org.hibernate.Session;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.functional.entities.OtherItem;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.ByRef;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.junit.Test;

import javax.transaction.Synchronization;
Expand All @@ -17,10 +24,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

Expand All @@ -41,6 +50,11 @@ public List<Object[]> getParameters() {
return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED);
}

@Override
protected boolean getUseQueryCache() {
return false;
}

@Test
public void testTwoRemoves() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -220,6 +234,98 @@ public void testEvictUpdateExpiration() throws Exception {
assertSingleCacheEntry();
}

@Test
public void testCollectionUpdate() throws Exception {
// the first insert puts VersionedEntry(null, null, timestamp), so we have to wait a while to cache the entry
TIME_SERVICE.advance(1);

withTxSession(s -> {
Item item = s.load(Item.class, itemId);
OtherItem otherItem = new OtherItem();
otherItem.setName("Other 1");
s.persist(otherItem);
item.addOtherItem(otherItem);
});
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
Set<OtherItem> otherItems = item.getOtherItems();
assertFalse(otherItems.isEmpty());
otherItems.remove(otherItems.iterator().next());
});

AdvancedCache collectionCache = ((BaseTransactionalDataRegion) sessionFactory().getSecondLevelCacheRegion(Item.class.getName() + ".otherItems")).getCache();
CountDownLatch putFromLoadLatch = new CountDownLatch(1);
AtomicBoolean committing = new AtomicBoolean(false);
CollectionUpdateTestInterceptor collectionUpdateTestInterceptor = new CollectionUpdateTestInterceptor(putFromLoadLatch);
AnotherCollectionUpdateTestInterceptor anotherInterceptor = new AnotherCollectionUpdateTestInterceptor(putFromLoadLatch, committing);
collectionCache.addInterceptor(collectionUpdateTestInterceptor, collectionCache.getInterceptorChain().size() - 1);
collectionCache.addInterceptor(anotherInterceptor, 0);

TIME_SERVICE.advance(1);
Future<Boolean> addFuture = executor.submit(() -> withTxSessionApply(s -> {
collectionUpdateTestInterceptor.updateLatch.await();
Item item = s.load(Item.class, itemId);
OtherItem otherItem = new OtherItem();
otherItem.setName("Other 2");
s.persist(otherItem);
item.addOtherItem(otherItem);
committing.set(true);
return true;
}));

Future<Boolean> readFuture = executor.submit(() -> withTxSessionApply(s -> {
Item item = s.load(Item.class, itemId);
assertTrue(item.getOtherItems().isEmpty());
return true;
}));

addFuture.get();
readFuture.get();
collectionCache.removeInterceptor(CollectionUpdateTestInterceptor.class);
collectionCache.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class);

withTxSession(s -> assertFalse(s.load(Item.class, itemId).getOtherItems().isEmpty()));
}

private class CollectionUpdateTestInterceptor extends BaseCustomInterceptor {
final AtomicBoolean firstPutFromLoad = new AtomicBoolean(true);
final CountDownLatch putFromLoadLatch;
final CountDownLatch updateLatch = new CountDownLatch(1);

public CollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch) {
this.putFromLoadLatch = putFromLoadLatch;
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
if (firstPutFromLoad.compareAndSet(true, false)) {
updateLatch.countDown();
putFromLoadLatch.await();
}
}
return super.visitPutKeyValueCommand(ctx, command);
}
}

private class AnotherCollectionUpdateTestInterceptor extends BaseCustomInterceptor {
final CountDownLatch putFromLoadLatch;
final AtomicBoolean committing;

public AnotherCollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch, AtomicBoolean committing) {
this.putFromLoadLatch = putFromLoadLatch;
this.committing = committing;
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (committing.get() && !command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
putFromLoadLatch.countDown();
}
return super.visitPutKeyValueCommand(ctx, command);
}
}

protected void assertSingleEmpty() {
Map contents = Caches.entrySet(entityCache).toMap();
Object value;
Expand Down

0 comments on commit cc61914

Please sign in to comment.