Skip to content

Commit

Permalink
HHH-9868, HHH-9881 Must not write into non-transactional caches durin…
Browse files Browse the repository at this point in the history
…g transactional write

* The write can only invalidate (remove) the entry and block further PFERs of that entry
* After successful DB update, if there have not been any concurrent updates the value can be PFERed into the cache
  • Loading branch information
rvansa authored and galderz committed Aug 12, 2015
1 parent 19c14ce commit 93d39fa
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 173 deletions.
Expand Up @@ -78,19 +78,19 @@ private void start() {
@Override @Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) { if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, command.getKey()); return handleInvalidate(ctx, command, new Object[] { command.getKey() });
} }
return invokeNextInterceptor(ctx, command); return invokeNextInterceptor(ctx, command);
} }


@Override @Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey()); return handleInvalidate(ctx, command, new Object[] { command.getKey() });
} }


@Override @Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey()); return handleInvalidate(ctx, command, new Object[] { command.getKey() });
} }


@Override @Override
Expand All @@ -107,39 +107,39 @@ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) thr


@Override @Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray(); if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, keys); return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
}
return invokeNextInterceptor(ctx, command);
} }


private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable { private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command); Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && !ctx.isInTxScope()) { if (command.isSuccessful() && keys != null && keys.length != 0) {
if (keys != null && keys.length != 0) { invalidateAcrossCluster(command, keys);
if (!isLocalModeForced(command)) {
invalidateAcrossCluster(isSynchronous(command), keys, ctx);
}
}
} }
return retval; return retval;
} }


private void invalidateAcrossCluster(boolean synchronous, Object[] keys, InvocationContext ctx) throws Throwable { private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
// increment invalidations counter if statistics maintained // increment invalidations counter if statistics maintained
incrementInvalidations(); incrementInvalidations();
InvalidateCommand invalidateCommand; InvalidateCommand invalidateCommand;
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys); Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys);
if (lockOwner == null) { if (!isLocalModeForced(command)) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys); if (lockOwner == null) {
} invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
else { }
invalidateCommand = commandInitializer.buildBeginInvalidationCommand( else {
InfinispanCollections.<Flag>emptySet(), keys, lockOwner); invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
} InfinispanCollections.<Flag>emptySet(), keys, lockOwner);
if (log.isDebugEnabled()) { }
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); if (log.isDebugEnabled()) {
} log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
}


rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(synchronous)); rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(isSynchronous(command)));
}
} }


private void incrementInvalidations() { private void incrementInvalidations() {
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void injectDependencies(CacheCommandInitializer commandInitializer, RpcMa
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable { public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) { if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) { for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ((BeginInvalidationCommand) command).getLockOwner()); putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
} }
} }
return invokeNextInterceptor(ctx, command); return invokeNextInterceptor(ctx, command);
Expand Down
@@ -0,0 +1,86 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;

import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.hibernate.resource.transaction.spi.TransactionStatus;

/**
* Delegate for non-transactional caches
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class NonTxTransactionalAccessDelegate extends TransactionalAccessDelegate {
public NonTxTransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
super(region, validator);
}

@Override
@SuppressWarnings("UnusedParameters")
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
if ( !region.checkValid() ) {
return false;
}

// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}

@Override
@SuppressWarnings("UnusedParameters")
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException {
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.

// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}

@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
TransactionCoordinator tc = session.getTransactionCoordinator();
boolean doPFER = tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
if ( !putValidator.endInvalidatingKey(session, key, doPFER) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
}
}
}

0 comments on commit 93d39fa

Please sign in to comment.