Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HHH-10101 Implement nonstrict-read-write mode in Infinispan 2LC
* requires non-transactional cache in repl/dist/local mode and versioned entities
- Loading branch information
Showing
49 changed files
with
1,548 additions
and
764 deletions.
There are no files selected for viewing
188 changes: 188 additions & 0 deletions
188
...finispan/src/main/java/org/hibernate/cache/infinispan/access/NonStrictAccessDelegate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
/* | ||
* Hibernate, Relational Persistence for Idiomatic Java | ||
* | ||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later. | ||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>. | ||
*/ | ||
package org.hibernate.cache.infinispan.access; | ||
|
||
import org.hibernate.cache.CacheException; | ||
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; | ||
import org.hibernate.cache.infinispan.util.Caches; | ||
import org.hibernate.cache.infinispan.util.VersionedEntry; | ||
import org.hibernate.cache.spi.access.SoftLock; | ||
import org.hibernate.cache.spi.entry.CacheEntry; | ||
import org.hibernate.engine.spi.SessionImplementor; | ||
import org.hibernate.resource.transaction.TransactionCoordinator; | ||
import org.infinispan.AdvancedCache; | ||
import org.infinispan.configuration.cache.Configuration; | ||
import org.infinispan.context.Flag; | ||
import org.infinispan.util.logging.Log; | ||
import org.infinispan.util.logging.LogFactory; | ||
|
||
import java.util.Comparator; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Access delegate that relaxes the consistency a bit: stale reads are prohibited only after the transaction | ||
* commits. This should also be able to work with async caches, and that would allow the replication delay | ||
* even after the commit. | ||
* | ||
* @author Radim Vansa <rvansa@redhat.com> | ||
*/ | ||
public class NonStrictAccessDelegate implements AccessDelegate { | ||
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class ); | ||
|
||
private final BaseTransactionalDataRegion region; | ||
private final AdvancedCache cache; | ||
private final AdvancedCache writeCache; | ||
private final AdvancedCache putFromLoadCache; | ||
private final Comparator versionComparator; | ||
|
||
|
||
public NonStrictAccessDelegate(BaseTransactionalDataRegion region) { | ||
this.region = region; | ||
this.cache = region.getCache(); | ||
this.writeCache = Caches.ignoreReturnValuesCache(cache); | ||
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY ); | ||
Configuration configuration = cache.getCacheConfiguration(); | ||
if (configuration.clustering().cacheMode().isInvalidation()) { | ||
throw new IllegalArgumentException("Nonstrict-read-write mode cannot use invalidation."); | ||
} | ||
if (configuration.transaction().transactionMode().isTransactional()) { | ||
throw new IllegalArgumentException("Currently transactional caches are not supported."); | ||
} | ||
this.versionComparator = region.getCacheDataDescription().getVersionComparator(); | ||
if (versionComparator == null) { | ||
throw new IllegalArgumentException("This strategy requires versioned entities/collections but region " + region.getName() + " contains non-versioned data!"); | ||
} | ||
} | ||
|
||
@Override | ||
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException { | ||
if (txTimestamp < region.getLastRegionInvalidation() ) { | ||
return null; | ||
} | ||
Object value = cache.get(key); | ||
if (value instanceof VersionedEntry) { | ||
return ((VersionedEntry) value).getValue(); | ||
} | ||
return value; | ||
} | ||
|
||
@Override | ||
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) { | ||
return putFromLoad(session, key, value, txTimestamp, version, false); | ||
} | ||
|
||
@Override | ||
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException { | ||
long lastRegionInvalidation = region.getLastRegionInvalidation(); | ||
if (txTimestamp < lastRegionInvalidation) { | ||
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation); | ||
return false; | ||
} | ||
assert version != null; | ||
|
||
if (minimalPutOverride) { | ||
Object prev = cache.get(key); | ||
if (prev != null) { | ||
Object oldVersion = getVersion(prev); | ||
if (oldVersion != null) { | ||
if (versionComparator.compare(version, oldVersion) <= 0) { | ||
return false; | ||
} | ||
} | ||
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) { | ||
return false; | ||
} | ||
} | ||
} | ||
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context | ||
// when it is present in the container. TombstoneCallInterceptor will deal with this. | ||
if (!(value instanceof CacheEntry)) { | ||
value = new VersionedEntry(value, version, txTimestamp); | ||
} | ||
putFromLoadCache.put(key, value); | ||
return true; | ||
} | ||
|
||
@Override | ||
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException { | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void remove(SessionImplementor session, Object key) throws CacheException { | ||
Object value = cache.get(key); | ||
Object version = getVersion(value); | ||
// there's no 'afterRemove', so we have to use our own synchronization | ||
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator(); | ||
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version); | ||
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync); | ||
} | ||
|
||
@Override | ||
public void removeAll() throws CacheException { | ||
region.beginInvalidation(); | ||
try { | ||
Caches.broadcastEvictAll(cache); | ||
} | ||
finally { | ||
region.endInvalidation(); | ||
} | ||
} | ||
|
||
@Override | ||
public void evict(Object key) throws CacheException { | ||
writeCache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); | ||
} | ||
|
||
@Override | ||
public void evictAll() throws CacheException { | ||
region.beginInvalidation(); | ||
try { | ||
Caches.broadcastEvictAll(cache); | ||
} | ||
finally { | ||
region.endInvalidation(); | ||
} | ||
} | ||
|
||
@Override | ||
public void unlockItem(SessionImplementor session, Object key) throws CacheException { | ||
} | ||
|
||
@Override | ||
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) { | ||
writeCache.put(key, getVersioned(value, version, session.getTimestamp())); | ||
return true; | ||
} | ||
|
||
@Override | ||
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) { | ||
writeCache.put(key, getVersioned(value, currentVersion, session.getTimestamp())); | ||
return true; | ||
} | ||
|
||
protected Object getVersion(Object value) { | ||
if (value instanceof CacheEntry) { | ||
return ((CacheEntry) value).getVersion(); | ||
} | ||
else if (value instanceof VersionedEntry) { | ||
return ((VersionedEntry) value).getVersion(); | ||
} | ||
return null; | ||
} | ||
|
||
protected Object getVersioned(Object value, Object version, long timestamp) { | ||
assert value != null; | ||
assert version != null; | ||
return new VersionedEntry(value, version, timestamp); | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
...nfinispan/src/main/java/org/hibernate/cache/infinispan/access/RemovalSynchronization.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Hibernate, Relational Persistence for Idiomatic Java | ||
* | ||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later. | ||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>. | ||
*/ | ||
package org.hibernate.cache.infinispan.access; | ||
|
||
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; | ||
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion; | ||
import org.hibernate.cache.infinispan.util.VersionedEntry; | ||
import org.hibernate.resource.transaction.TransactionCoordinator; | ||
import org.infinispan.AdvancedCache; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* @author Radim Vansa <rvansa@redhat.com> | ||
*/ | ||
public class RemovalSynchronization extends InvocationAfterCompletion { | ||
private final BaseTransactionalDataRegion region; | ||
private final Object key; | ||
private final Object version; | ||
|
||
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) { | ||
super(tc, cache, requiresTransaction); | ||
this.region = region; | ||
this.key = key; | ||
this.version = version; | ||
} | ||
|
||
@Override | ||
protected void invoke(boolean success, AdvancedCache cache) { | ||
if (success) { | ||
if (version == null) { | ||
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); | ||
} | ||
else { | ||
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
} | ||
} |
160 changes: 160 additions & 0 deletions
160
...inispan/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* Hibernate, Relational Persistence for Idiomatic Java | ||
* | ||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later. | ||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>. | ||
*/ | ||
package org.hibernate.cache.infinispan.access; | ||
|
||
import org.hibernate.cache.infinispan.util.VersionedEntry; | ||
import org.infinispan.AdvancedCache; | ||
import org.infinispan.commands.read.SizeCommand; | ||
import org.infinispan.commands.write.PutKeyValueCommand; | ||
import org.infinispan.commons.util.CloseableIterable; | ||
import org.infinispan.container.entries.CacheEntry; | ||
import org.infinispan.container.entries.MVCCEntry; | ||
import org.infinispan.context.Flag; | ||
import org.infinispan.context.InvocationContext; | ||
import org.infinispan.factories.annotations.Inject; | ||
import org.infinispan.filter.NullValueConverter; | ||
import org.infinispan.interceptors.CallInterceptor; | ||
import org.infinispan.util.logging.Log; | ||
import org.infinispan.util.logging.LogFactory; | ||
|
||
import java.util.Comparator; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
|
||
/** | ||
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate} | ||
* and {@link org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion} | ||
* | ||
* The behaviour here also breaks notifications, which are not used for 2LC caches. | ||
* | ||
* @author Radim Vansa <rvansa@redhat.com> | ||
*/ | ||
public class VersionedCallInterceptor extends CallInterceptor { | ||
private final Comparator<Object> versionComparator; | ||
private AdvancedCache cache; | ||
|
||
public VersionedCallInterceptor(Comparator<Object> versionComparator) { | ||
this.versionComparator = versionComparator; | ||
} | ||
|
||
@Inject | ||
public void injectDependencies(AdvancedCache cache) { | ||
this.cache = cache; | ||
} | ||
|
||
@Override | ||
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { | ||
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(command.getKey()); | ||
if (e == null) { | ||
return null; | ||
} | ||
|
||
Object oldValue = e.getValue(); | ||
Object oldVersion = null; | ||
long oldTimestamp = Long.MIN_VALUE; | ||
if (oldValue instanceof VersionedEntry) { | ||
oldVersion = ((VersionedEntry) oldValue).getVersion(); | ||
oldTimestamp = ((VersionedEntry) oldValue).getTimestamp(); | ||
oldValue = ((VersionedEntry) oldValue).getValue(); | ||
} | ||
else if (oldValue instanceof org.hibernate.cache.spi.entry.CacheEntry) { | ||
oldVersion = ((org.hibernate.cache.spi.entry.CacheEntry) oldValue).getVersion(); | ||
} | ||
|
||
Object newValue = command.getValue(); | ||
Object newVersion = null; | ||
long newTimestamp = Long.MIN_VALUE; | ||
Object actualNewValue = newValue; | ||
boolean isRemoval = false; | ||
if (newValue instanceof VersionedEntry) { | ||
VersionedEntry ve = (VersionedEntry) newValue; | ||
newVersion = ve.getVersion(); | ||
newTimestamp = ve.getTimestamp(); | ||
if (ve.getValue() == null) { | ||
isRemoval = true; | ||
} | ||
else if (ve.getValue() instanceof org.hibernate.cache.spi.entry.CacheEntry) { | ||
actualNewValue = ve.getValue(); | ||
} | ||
} | ||
else if (newValue instanceof org.hibernate.cache.spi.entry.CacheEntry) { | ||
newVersion = ((org.hibernate.cache.spi.entry.CacheEntry) newValue).getVersion(); | ||
} | ||
|
||
if (newVersion == null) { | ||
// eviction or post-commit removal: we'll store it with given timestamp | ||
setValue(e, newValue); | ||
return null; | ||
} | ||
if (oldVersion == null) { | ||
assert oldValue == null || oldTimestamp != Long.MIN_VALUE; | ||
if (newTimestamp == Long.MIN_VALUE) { | ||
// remove, knowing the version | ||
setValue(e, newValue); | ||
} | ||
else if (newTimestamp <= oldTimestamp) { | ||
// either putFromLoad or regular update/insert - in either case this update might come | ||
// when it was evicted/region-invalidated. In both cases, with old timestamp we'll leave | ||
// the invalid value | ||
assert oldValue == null; | ||
} | ||
else { | ||
setValue(e, newValue); | ||
} | ||
return null; | ||
} | ||
int compareResult = versionComparator.compare(newVersion, oldVersion); | ||
if (isRemoval && compareResult >= 0) { | ||
setValue(e, newValue); | ||
} | ||
else if (compareResult > 0) { | ||
setValue(e, actualNewValue); | ||
} | ||
return null; | ||
} | ||
|
||
private Object setValue(MVCCEntry e, Object value) { | ||
if (e.isRemoved()) { | ||
e.setRemoved(false); | ||
e.setCreated(true); | ||
e.setValid(true); | ||
} | ||
else { | ||
e.setChanged(true); | ||
} | ||
return e.setValue(value); | ||
} | ||
|
||
private void removeValue(MVCCEntry e) { | ||
e.setRemoved(true); | ||
e.setChanged(true); | ||
e.setCreated(false); | ||
e.setValid(false); | ||
e.setValue(null); | ||
} | ||
|
||
@Override | ||
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable { | ||
Set<Flag> flags = command.getFlags(); | ||
int size = 0; | ||
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null); | ||
// In non-transactional caches we don't care about context | ||
CloseableIterable<CacheEntry<Object, Void>> iterable = decoratedCache | ||
.filterEntries(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).converter(NullValueConverter.getInstance()); | ||
try { | ||
for (CacheEntry<Object, Void> entry : iterable) { | ||
if (size++ == Integer.MAX_VALUE) { | ||
return Integer.MAX_VALUE; | ||
} | ||
} | ||
} | ||
finally { | ||
iterable.close(); | ||
} | ||
return size; | ||
} | ||
} |
Oops, something went wrong.