Skip to content

Commit

Permalink
HHH-11339 Use UnorderedDistributionInterceptor for async replication
Browse files Browse the repository at this point in the history
(cherry picked from commit f598075)
  • Loading branch information
rvansa authored and gbadner committed Jul 19, 2017
1 parent 7cf3d2e commit f2f4c1a
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 98 deletions.
Expand Up @@ -27,14 +27,13 @@ public class FutureUpdateSynchronization extends InvocationAfterCompletion {
private final Object value;
private final BaseTransactionalDataRegion region;
private final long sessionTimestamp;
private final AdvancedCache localCache;
private final AdvancedCache asyncCache;
private final AdvancedCache cache;

public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction,
Object key, Object value, BaseTransactionalDataRegion region, long sessionTimestamp) {

public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache localCache, AdvancedCache asyncCache,
boolean requiresTransaction, Object key, Object value, BaseTransactionalDataRegion region, long sessionTimestamp) {
super(tc, requiresTransaction);
this.localCache = localCache;
this.asyncCache = asyncCache;
this.cache = cache;
this.key = key;
this.value = value;
this.region = region;
Expand All @@ -57,13 +56,7 @@ protected void invoke(boolean success) {
FutureUpdate futureUpdate = new FutureUpdate(uuid, region.nextTimestamp(), success ? this.value : null);
for (;;) {
try {
// Similar to putFromLoad, we have to update this node synchronously because after transaction
// is committed it is expected that we'll retrieve cached instance until next invalidation,
// but the replication this node -> primary -> this node can take a while
// We need to first execute the async update and then local one, because if we're on the primary
// owner the local future update, would fail the async one.
asyncCache.put(key, futureUpdate);
localCache.put(key, futureUpdate);
cache.put(key, futureUpdate);
return;
}
catch (Exception e) {
Expand Down
Expand Up @@ -7,10 +7,11 @@
package org.hibernate.cache.infinispan.access;

import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import org.infinispan.util.concurrent.locks.LockUtil;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* With regular {@link org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor},
Expand All @@ -21,39 +22,33 @@
* Similar issue threatens consistency when the command has {@link org.infinispan.context.Flag#CACHE_MODE_LOCAL}
* - these commands don't acquire locks either.
*
* Therefore, this interceptor locks the entry in all situations but when it is sending message to primary owner
* (locking then could lead to deadlocks).
* Therefore, this interceptor locks the entry all the time. {@link UnorderedDistributionInterceptor} does not forward
* the message from non-origin to any other node, and the distribution interceptor won't block on RPC but will return
* {@link CompletableFuture} and we'll wait for it here.
*/
public class LockingInterceptor extends NonTransactionalLockingInterceptor {
@Override
protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable {
Object returnValue = null;
try {
// Clear any metadata; we'll set them as appropriate in TombstoneCallInterceptor
command.setMetadata(null);

boolean shouldLock;
if (hasSkipLocking(command)) {
shouldLock = false;
}
else if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
shouldLock = true;
}
else if (!ctx.isOriginLocal()) {
shouldLock = true;
}
else if (LockUtil.getLockOwnership(command.getKey(), cdl) == LockUtil.LockOwnership.PRIMARY) {
shouldLock = true;
}
else {
shouldLock = false;
}
if (shouldLock) {
lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command));
}
return invokeNextInterceptor(ctx, command);
lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command));

returnValue = invokeNextInterceptor(ctx, command);
return returnValue;
}
finally {
lockManager.unlockAll(ctx);
if (returnValue instanceof CompletableFuture) {
try {
((CompletableFuture) returnValue).join();
}
catch (CompletionException e) {
throw e.getCause();
}
}
}
}
}
Expand Up @@ -36,8 +36,7 @@ public class NonStrictAccessDelegate implements AccessDelegate {
private final BaseTransactionalDataRegion region;
private final AdvancedCache cache;
private final AdvancedCache writeCache;
private final AdvancedCache putFromLoadCacheLocal;
private final AdvancedCache putFromLoadCacheAsync;
private final AdvancedCache putFromLoadCache;
private final Comparator versionComparator;


Expand All @@ -46,8 +45,7 @@ public NonStrictAccessDelegate(BaseTransactionalDataRegion region) {
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
// Note that correct behaviour of local and async writes depends on LockingInterceptor (see there for details)
this.putFromLoadCacheLocal = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.CACHE_MODE_LOCAL );
this.putFromLoadCacheAsync = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.FORCE_ASYNCHRONOUS );
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.FORCE_ASYNCHRONOUS );
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("Nonstrict-read-write mode cannot use invalidation.");
Expand Down Expand Up @@ -115,8 +113,7 @@ else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev
}
// Apply the update locally first - if we're the backup owner, async propagation wouldn't change the value
// for the subsequent operation soon enough as it goes through primary owner
putFromLoadCacheAsync.put(key, value);
putFromLoadCacheLocal.put(key, value);
putFromLoadCache.put(key, value);
return true;
}

Expand Down
Expand Up @@ -29,21 +29,17 @@ public class TombstoneAccessDelegate implements AccessDelegate {
protected final BaseTransactionalDataRegion region;
protected final AdvancedCache cache;
protected final AdvancedCache writeCache;
protected final AdvancedCache localWriteCache;
protected final AdvancedCache asyncWriteCache;
protected final AdvancedCache putFromLoadCacheLocal;
protected final AdvancedCache putFromLoadCacheAsync;
protected final AdvancedCache putFromLoadCache;
protected final boolean requiresTransaction;

public TombstoneAccessDelegate(BaseTransactionalDataRegion region) {
this.region = region;
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
// Note that correct behaviour of local and async writes depends on LockingInterceptor (see there for details)
this.localWriteCache = Caches.localCache(writeCache);
this.asyncWriteCache = Caches.asyncWriteCache(writeCache, Flag.IGNORE_RETURN_VALUES);
this.putFromLoadCacheLocal = localWriteCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
this.putFromLoadCacheAsync = asyncWriteCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
this.asyncWriteCache = writeCache.withFlags(Flag.FORCE_ASYNCHRONOUS);
this.putFromLoadCache = asyncWriteCache.withFlags(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY);
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("For tombstone-based caching, invalidation cache is not allowed.");
Expand Down Expand Up @@ -101,11 +97,7 @@ else if (prev != null) {
}
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
// when it is present in the container. TombstoneCallInterceptor will deal with this.
TombstoneUpdate update = new TombstoneUpdate(session.getTimestamp(), value);
// If we're the backup owner, async propagation wouldn't change the value soon enough as it goes
// through primary owner - therefore we'll synchronously update it locally.
putFromLoadCacheAsync.put(key, update);
putFromLoadCacheLocal.put(key, update);
putFromLoadCache.put(key, new TombstoneUpdate(session.getTimestamp(), value));
return true;
}

Expand All @@ -128,7 +120,7 @@ public void remove(SessionImplementor session, Object key) throws CacheException

protected void write(SessionImplementor session, Object key, Object value) {
TransactionCoordinator tc = session.getTransactionCoordinator();
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, localWriteCache, asyncWriteCache, requiresTransaction, key, value, region, session.getTimestamp());
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, asyncWriteCache, requiresTransaction, key, value, region, session.getTimestamp());
// The update will be invalidating all putFromLoads for the duration of expiration or until removed by the synchronization
Tombstone tombstone = new Tombstone(sync.getUuid(), region.nextTimestamp() + region.getTombstoneExpiration());
// The outcome of this operation is actually defined in TombstoneCallInterceptor
Expand Down
@@ -0,0 +1,90 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;

import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.List;

/**
* Since the data handled in {@link TombstoneCallInterceptor} or {@link VersionedCallInterceptor}
* does not rely on the order how these are applied (the updates are commutative), this interceptor
* simply sends any command to all other owners without ordering them through primary owner.
* Note that {@link LockingInterceptor} is required in the stack as locking on backup is not guaranteed
* by primary owner.
*/
public class UnorderedDistributionInterceptor extends NonTxDistributionInterceptor {
private static Log log = LogFactory.getLog(UnorderedDistributionInterceptor.class);
private static final boolean trace = log.isTraceEnabled();

private DistributionManager distributionManager;
private RpcOptions syncRpcOptions, asyncRpcOptions;

@Inject
public void inject(DistributionManager distributionManager) {
this.distributionManager = distributionManager;
}

@Start
public void start() {
syncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
// We don't have to guarantee ordering even for asynchronous messages
asyncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
// for state-transfer related writes
return invokeNextInterceptor(ctx, command);
}
int commandTopologyId = command.getTopologyId();
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
if (commandTopologyId != -1 && currentTopologyId != commandTopologyId) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
commandTopologyId + ", got " + currentTopologyId);
}

ConsistentHash writeCH = distributionManager.getWriteConsistentHash();
List<Address> owners = null;
if (writeCH.isReplicated()) {
// local result is always ignored
invokeNextInterceptor(ctx, command);
}
else {
owners = writeCH.locateOwners(command.getKey());
if (owners.contains(rpcManager.getAddress())) {
invokeNextInterceptor(ctx, command);
}
else {
log.tracef("Not invoking %s on %s since it is not an owner", command, rpcManager.getAddress());
}
}

if (ctx.isOriginLocal() && command.isSuccessful()) {
// This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while
// holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after
// unlocking (and committing) the entry.
return rpcManager.invokeRemotelyAsync(owners, command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions);
}
return null;
}
}
Expand Up @@ -15,6 +15,7 @@
import org.hibernate.cache.infinispan.access.TombstoneAccessDelegate;
import org.hibernate.cache.infinispan.access.TombstoneCallInterceptor;
import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.UnorderedDistributionInterceptor;
import org.hibernate.cache.infinispan.access.VersionedCallInterceptor;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
Expand All @@ -33,11 +34,14 @@
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;

import javax.transaction.TransactionManager;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -151,7 +155,7 @@ protected void prepareForVersionedEntries() {
return;
}

replaceLockingInterceptor();
replaceCommonInterceptors();

cache.removeInterceptor(CallInterceptor.class);
VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator());
Expand All @@ -172,7 +176,7 @@ private void prepareForTombstones() {
log.evictionWithTombstones();
}

replaceLockingInterceptor();
replaceCommonInterceptors();

cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(this);
Expand All @@ -183,13 +187,35 @@ private void prepareForTombstones() {
strategy = Strategy.TOMBSTONES;
}

private void replaceLockingInterceptor() {
private void replaceCommonInterceptors() {
CacheMode cacheMode = cache.getCacheConfiguration().clustering().cacheMode();
if (!cacheMode.isReplicated() && !cacheMode.isDistributed()) {
return;
}

LockingInterceptor lockingInterceptor = new LockingInterceptor();
cache.getComponentRegistry().registerComponent(lockingInterceptor, LockingInterceptor.class);
if (!cache.addInterceptorBefore(lockingInterceptor, NonTransactionalLockingInterceptor.class)) {
throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain());
}
cache.removeInterceptor(NonTransactionalLockingInterceptor.class);

UnorderedDistributionInterceptor distributionInterceptor = new UnorderedDistributionInterceptor();
cache.getComponentRegistry().registerComponent(distributionInterceptor, UnorderedDistributionInterceptor.class);
if (!cache.addInterceptorBefore(distributionInterceptor, NonTxDistributionInterceptor.class)) {
throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain());
}
cache.removeInterceptor(NonTxDistributionInterceptor.class);

EntryWrappingInterceptor ewi = cache.getComponentRegistry().getComponent(EntryWrappingInterceptor.class);
try {
Field isUsingLockDelegation = EntryWrappingInterceptor.class.getDeclaredField("isUsingLockDelegation");
isUsingLockDelegation.setAccessible(true);
isUsingLockDelegation.set(ewi, false);
}
catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
}

public long getTombstoneExpiration() {
Expand Down

0 comments on commit f2f4c1a

Please sign in to comment.