Skip to content

Commit

Permalink
HHH-10083 Support replicated and distributed caches
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and galderz committed Sep 16, 2015
1 parent 39e3dbb commit 5edcf26
Show file tree
Hide file tree
Showing 59 changed files with 2,378 additions and 693 deletions.
7 changes: 7 additions & 0 deletions hibernate-infinispan/hibernate-infinispan.gradle
Expand Up @@ -59,6 +59,13 @@ test {
systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = '2lc-test-tcp.xml' systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = '2lc-test-tcp.xml'
// systemProperties['log4j.configuration'] = 'file:/log4j/log4j-infinispan.xml' // systemProperties['log4j.configuration'] = 'file:/log4j/log4j-infinispan.xml'
enabled = true enabled = true
// Without this I have trouble running specific test using --tests switch
doFirst {
filter.includePatterns.each {
include "${it.replaceAll('\\.', "\\${File.separator}")}.class"
}
filter.setIncludePatterns('*')
}
} }


task packageTests(type: Jar) { task packageTests(type: Jar) {
Expand Down
@@ -0,0 +1,52 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;

import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.UUID;

/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class FutureUpdateSynchronization extends InvocationAfterCompletion {
private static final Log log = LogFactory.getLog( FutureUpdateSynchronization.class );

private final UUID uuid = UUID.randomUUID();
private final Object key;
private final Object value;

public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, Object key, Object value) {
super(tc, cache, requiresTransaction);
this.key = key;
this.value = value;
}

public UUID getUuid() {
return uuid;
}

@Override
protected void invoke(boolean success, AdvancedCache cache) {
// Exceptions in #afterCompletion() are silently ignored, since the transaction
// is already committed in DB. However we must not return until we update the cache.
for (;;) {
try {
cache.put(key, new FutureUpdate(uuid, success ? value : null));
return;
}
catch (Exception e) {
log.error("Failure updating cache in afterCompletion, will retry", e);
}
}
}
}
Expand Up @@ -28,15 +28,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
protected final PutFromLoadValidator putValidator; protected final PutFromLoadValidator putValidator;
protected final AdvancedCache<Object, Object> writeCache; protected final AdvancedCache<Object, Object> writeCache;


public static InvalidationCacheAccessDelegate create(BaseRegion region, PutFromLoadValidator validator) {
if (region.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional()) {
return new TxInvalidationCacheAccessDelegate(region, validator);
}
else {
return new NonTxInvalidationCacheAccessDelegate(region, validator);
}
}

/** /**
* Create a new transactional access delegate instance. * Create a new transactional access delegate instance.
* *
Expand Down
Expand Up @@ -13,12 +13,12 @@
* *
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class Synchronization implements javax.transaction.Synchronization { public class InvalidationSynchronization implements javax.transaction.Synchronization {
public final UUID uuid = UUID.randomUUID(); public final UUID uuid = UUID.randomUUID();
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor; private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private final Object[] keys; private final Object[] keys;


public Synchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) { public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor; this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
this.keys = keys; this.keys = keys;
} }
Expand Down
Expand Up @@ -20,7 +20,7 @@
* Non-transactional counterpart of {@link TxPutFromLoadInterceptor}. * Non-transactional counterpart of {@link TxPutFromLoadInterceptor}.
* Invokes {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} for each invalidation from * Invokes {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} for each invalidation from
* remote node ({@link BeginInvalidationCommand} and sends {@link EndInvalidationCommand} after the transaction * remote node ({@link BeginInvalidationCommand} and sends {@link EndInvalidationCommand} after the transaction
* is complete, with help of {@link Synchronization}; * is complete, with help of {@link InvalidationSynchronization};
* *
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
Expand Down
Expand Up @@ -617,7 +617,7 @@ public Object registerRemoteInvalidations(Object[] keys) {
if (trace) { if (trace) {
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys)); log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
} }
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys); InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync); transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
return sync.uuid; return sync.uuid;
} }
Expand Down
@@ -0,0 +1,175 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;

import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.concurrent.TimeUnit;

/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneAccessDelegate implements AccessDelegate {
private static final Log log = LogFactory.getLog( TombstoneAccessDelegate.class );

protected final BaseTransactionalDataRegion region;
protected final AdvancedCache cache;
protected final AdvancedCache writeCache;
protected final AdvancedCache asyncWriteCache;
protected final AdvancedCache putFromLoadCache;
protected final boolean requiresTransaction;

public TombstoneAccessDelegate(BaseTransactionalDataRegion region) {
this.region = region;
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
this.asyncWriteCache = Caches.asyncWriteCache(cache, Flag.IGNORE_RETURN_VALUES);
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("For tombstone-based caching, invalidation cache is not allowed.");
}
if (configuration.transaction().transactionMode().isTransactional()) {
throw new IllegalArgumentException("Currently transactional caches are not supported.");
}
requiresTransaction = configuration.transaction().transactionMode().isTransactional()
&& !configuration.transaction().autoCommit();
}

@Override
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
if (txTimestamp < region.getLastRegionInvalidation() ) {
return null;
}
Object value = cache.get(key);
if (value instanceof Tombstone) {
return null;
}
else if (value instanceof FutureUpdate) {
return ((FutureUpdate) value).getValue();
}
else {
return value;
}
}

@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
return putFromLoad(session, key, value, txTimestamp, version, false);
}

@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException {
long lastRegionInvalidation = region.getLastRegionInvalidation();
if (txTimestamp < lastRegionInvalidation) {
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation);
return false;
}
if (minimalPutOverride) {
Object prev = cache.get(key);
if (prev instanceof Tombstone) {
Tombstone tombstone = (Tombstone) prev;
long lastTimestamp = tombstone.getLastTimestamp();
if (txTimestamp <= lastTimestamp) {
log.tracef("putFromLoad not executed since tx started at %d, before last invalidation finished = %d", txTimestamp, lastTimestamp);
return false;
}
}
else if (prev != null) {
log.tracef("putFromLoad not executed since cache contains %s", prev);
return false;
}
}
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
// when it is present in the container. TombstoneCallInterceptor will deal with this.
putFromLoadCache.put(key, new TombstoneUpdate(session.getTimestamp(), value));
return true;
}

@Override
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
write(session, key, value);
return true;
}

@Override
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
write(session, key, value);
return true;
}

protected void write(SessionImplementor session, Object key, Object value) {
TransactionCoordinator tc = session.getTransactionCoordinator();
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, writeCache, requiresTransaction, key, value);
// FutureUpdate is handled in TombstoneCallInterceptor
writeCache.put(key, new FutureUpdate(sync.getUuid(), null), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
tc.getLocalSynchronizations().registerSynchronization(sync);
}

@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
TombstoneSynchronization sync = new TombstoneSynchronization(transactionCoordinator, asyncWriteCache, requiresTransaction, region, key);
Tombstone tombstone = new Tombstone(sync.getUuid(), session.getTimestamp() + region.getTombstoneExpiration(), false);
writeCache.put(key, tombstone, region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
}

@Override
public void removeAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}

@Override
public void evict(Object key) throws CacheException {
writeCache.put(key, TombstoneUpdate.EVICT);
}

@Override
public void evictAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}

@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
}

@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
return false;
}

@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
return false;
}
}

0 comments on commit 5edcf26

Please sign in to comment.