Skip to content

Commit

Permalink
ISPN-5876 Invalidate keys on commit if 2-phase
Browse files Browse the repository at this point in the history
* Invalidate keys on commit if transaction is 2-phase, otherwise
  invalidate keys on prepare.
  • Loading branch information
galderz authored and danberindei committed Nov 2, 2015
1 parent b8f21d1 commit 4f9556c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
Expand Down Expand Up @@ -134,18 +135,39 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t

@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
log.tracef("Entering InvalidationInterceptor's prepare phase. Ctx flags are empty");
// fetch the modifications before the transaction is committed (and thus removed from the txTable)
if (shouldInvokeRemoteTxCommand(ctx)) {
List<WriteCommand> mods = Arrays.asList(command.getModifications());
Transaction runningTransaction = ctx.getTransaction();
if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction");
broadcastInvalidateForPrepare(mods, runningTransaction, ctx);
} else {
log.tracef("Nothing to invalidate - no modifications in the transaction.");
if (command.isOnePhaseCommit()) {
Object retval = invokeNextInterceptor(ctx, command);
log.tracef("Entering InvalidationInterceptor's prepare phase. Ctx flags are empty");
// fetch the modifications before the transaction is committed (and thus removed from the txTable)
if (shouldInvokeRemoteTxCommand(ctx)) {
List<WriteCommand> mods = Arrays.asList(command.getModifications());
Transaction runningTransaction = ctx.getTransaction();
if (runningTransaction == null)
throw new IllegalStateException("we must have an associated transaction");
broadcastInvalidateForPrepare(mods, runningTransaction, ctx);
} else {
log.tracef("Nothing to invalidate - no modifications in the transaction.");
}
return retval;
}

return super.visitPrepareCommand(ctx, command);
}

@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
Object retval = super.visitCommitCommand(ctx, command);
Set<Object> affectedKeys = ctx.getAffectedKeys();
try {
log.tracef("On commit, send invalidate for keys: %s", affectedKeys);
invalidateAcrossCluster(defaultSynchronous, affectedKeys.toArray());
return retval;
} catch (Throwable t) {
if (t instanceof RuntimeException)
throw t;
else
throw log.unableToBroadcastInvalidation(t);
}
return retval;
}

@Override
Expand Down Expand Up @@ -232,7 +254,6 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t
}
}


protected void invalidateAcrossCluster(boolean synchronous, Object[] keys) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Expand Up @@ -850,5 +850,8 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout,
@LogMessage(level = ERROR)
@Message(value = "Failed to start rebalance for cache %s", id = 230)
void rebalanceStartError(String cacheName, @Cause Throwable cause);

@Message(value = "Unable to broadcast invalidation messages", id = 231)
RuntimeException unableToBroadcastInvalidation(@Cause Throwable e);
}

0 comments on commit 4f9556c

Please sign in to comment.